Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_workflow_task_ids_with_unstarted_workflow(self):
workflow = self.gbdx.Workflow([])
with self.assertRaises(WorkflowError):
task_ids = workflow.task_ids
def test_create_simpleworkflow(self):
aoptask = self.gbdx.Task("AOP_Strip_Processor", data='dummy', enable_acomp=False, enable_pansharpen=False)
s3task = self.gbdx.Task("StageDataToS3", data=aoptask.get_output('log'), destination='dummydestination')
workflow = self.gbdx.Workflow([s3task, aoptask])
assert isinstance(workflow, Workflow)
assert workflow.id is None
assert workflow.name is not None
try:
workflow.status
except WorkflowError as e:
pass
else:
raise Exception('failed test')
assert not workflow.complete
(list): tasks with their stdout
Example:
>>> workflow.stdout
[
{
"id": "4488895771403082552",
"taskType": "AOP_Strip_Processor",
"name": "Task1",
"stdout": "............"
}
]
'''
if not self.id:
raise WorkflowError('Workflow is not running. Cannot get stdout.')
if self.batch_values:
raise NotImplementedError("Query Each Workflow Id within the Batch Workflow for stdout.")
wf = self.workflow.get(self.id)
stdout_list = []
for task in wf['tasks']:
stdout_list.append(
{
'id': task['id'],
'taskType': task['taskType'],
'name': task['name'],
'stdout': self.workflow.get_stdout(self.id, task['id'])
}
)
def status(self):
if not self.id:
raise WorkflowError('Workflow is not running. Cannot check status.')
if self.batch_values:
status = self.workflow.batch_workflow_status(self.id)
else:
status = self.workflow.status(self.id)
return status
def cancel(self):
'''
Cancel a running workflow.
Args:
None
Returns:
None
'''
if not self.id:
raise WorkflowError('Workflow is not running. Cannot cancel.')
if self.batch_values:
self.workflow.batch_workflow_cancel(self.id)
else:
self.workflow.cancel(self.id)
def generate_workflow_description(self):
'''
Generate workflow json for launching the workflow against the gbdx api
Args:
None
Returns:
json string
'''
if not self.tasks:
raise WorkflowError('Workflow contains no tasks, and cannot be executed.')
self.definition = self.workflow_skeleton()
if self.batch_values:
self.definition["batch_values"] = self.batch_values
all_input_port_values = [t.inputs.__getattribute__(input_port_name).value for t in self.tasks for
input_port_name in t.inputs._portnames]
for task in self.tasks:
# only include multiplex output ports in this task if other tasks refer to them in their inputs.
# skip this if there is only one task in the workflow
# 1. find the multplex output port_names in this task
# 2. see if they are referred to in any other tasks inputs
# 3. If not, exclude them from the workflow_def
output_multiplex_ports_to_exclude = []
if len(self.tasks) > 1:
def task_ids(self):
'''
Get the task IDs of a running workflow
Args:
None
Returns:
List of task IDs
'''
if not self.id:
raise WorkflowError('Workflow is not running. Cannot get task IDs.')
if self.batch_values:
raise NotImplementedError("Query Each Workflow Id within the Batch Workflow for task IDs.")
wf = self.workflow.get(self.id)
return [task['id'] for task in wf['tasks']]
def events(self):
if not self.id:
raise WorkflowError('Workflow is not running. Cannot check status.')
if self.batch_values:
raise NotImplementedError("Query Each Workflow Id within the Batch Workflow for Events")
return self.workflow.events(self.id)
Example:
>>> workflow.stderr
[
{
"id": "4488895771403082552",
"taskType": "AOP_Strip_Processor",
"name": "Task1",
"stderr": "............"
}
]
'''
if not self.id:
raise WorkflowError('Workflow is not running. Cannot get stderr.')
if self.batch_values:
raise NotImplementedError("Query Each Workflow Id within the Batch Workflow for stderr.")
wf = self.workflow.get(self.id)
stderr_list = []
for task in wf['tasks']:
stderr_list.append(
{
'id': task['id'],
'taskType': task['taskType'],
'name': task['name'],
'stderr': self.workflow.get_stderr(self.id, task['id'])
}
)