Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
class TComplicatedTask(SimplestTask):
specific_input = data.target
task_input = data.target
some_param = parameter.value(1)
def run(self):
self.log_metric("some_metric", 1)
self.log_metric("some_metric1", 2.0)
self.log_metric("m_string", "my_metric")
self.log_metric("m_tuple", (1, 2, "complicated"))
super(TComplicatedTask, self).run()
class TNestedPipeline1(PipelineTask):
parameter_with_huge_value = parameter.value(default="some_v2_" * 20)
some_output = output
def band(self):
self.some_output = AT_1(simplest_param="2").simplest_output
class TNestedPipeline2(PipelineTask):
some_output = output
parameter_with_huge_value = parameter(default="some_v1_" * 20)
def band(self):
self.some_output = AT_1(simplest_param="2").simplest_output
class TSuperNestedPipeline(PipelineTask):
class TError2RunTask(TTask):
def run(self):
raise Exception("Some user error")
class TLongTimeRunning(TTask):
sleep = parameter.value(default=0)
def run(self):
if self.sleep:
sleep(self.sleep)
super(TLongTimeRunning, self).run()
raise Exception("Some user error")
class TNestedPipeline(PipelineTask):
long_time_run = output
def band(self):
self.long_time_run = TLongTimeRunning().simplest_output
class TPipeWithErrors(PipelineTask):
t1 = output
t2 = output
t_pipeline = output
def band(self):
self.t1 = TErrorRunTask()
self.t2 = TError2RunTask()
self.t_pipeline = TNestedPipeline().long_time_run
chars_read += len(line)
self.log_metric("chars_read", chars_read)
self.log_artifact("figure.png", self.gen_hist(result))
self.counters.write(json.dumps(result))
def gen_hist(self, result):
import matplotlib.pyplot as plt
words, counts = zip(*result.most_common(5))
plt.bar(words, counts)
return plt.gcf()
class WordCountPipeline(PipelineTask):
counter = output
def band(self):
count = WordCount(text=scenario_path("data/some_log.txt"))
self.counter = count.counters
def test_word_count():
t = WordCount(text=__file__)
t.dbnd_run()
meta_path = t.ctrl.last_task_run.attempt_folder
chars_read = os.path.join(meta_path.path, "metrics", "user", "chars_read")
artifact = os.path.join(meta_path.path, "artifacts", "figure.png")
assert os.path.exists(chars_read)
def test_subtype_not_in_simpletype(self):
with pytest.raises(
DatabandBuildError, message="is not supported by main value"
):
class MyPTWithError(PipelineTask):
wrong_param = parameter.sub_type(bool)[int]
def band(self):
return None
def test_band_ret_dict(self):
class TRetTask(PipelineTask):
def band(self):
return TTask(t_param=1)
assert_run_task(TRetTask())
o_1 = output[str]
def run(self):
self.o_1 = self.p1
class T2(PythonTask):
p1 = parameter.value("somep")
o_1 = output[str]
def run(self):
raise Exception()
# self.o_1 = self.p1
class TPipe(PipelineTask):
o_1 = output[str]
o_2 = output[str]
def band(self):
self.o_1 = T1().o_1
self.o_2 = T2(p1=self.o_1)
if __name__ == "__main__":
TPipe(override={T1.task_version: "now"}).dbnd_run()
class A(TTask):
pass
class B(A):
x = parameter[str]
tt = data
o_first = output.data
o_second = output.data
def run(self):
super(B, self).run()
self.o_first.as_object.touch()
self.o_second.as_object.touch()
class CPipeline(PipelineTask):
x = parameter[str]
a_output = output.data
b_main = output.data
b_output = output.data
def band(self):
self.a_output = A(task_name="A_simple")
b_main = B(task_name="B_simple", tt=self.a_output, x=self.x)
self.b_main = b_main
self.b_output = b_main.o_second
c_pipeline = CPipeline(x="some_x_values")
c_pipeline.dbnd_run()
assert c_pipeline
def test_task_input_via_band1(self, file_on_disk):
class TTAskWithInputTask1(PipelineTask):
t_output = output.data
def band(self):
self.t_output = TTaskWithInput(t_input=file_on_disk.path)
assert_run_task(TTAskWithInputTask1())
split_selector = parameter[str]
selected_split = output
def band(self):
logging.info(
"All logs will be printed on task creation - before the real execution"
)
splits = DataSplit().splits
for key, split in six.iteritems(splits):
logging.info("split %s %s", key, split)
self.selected_split = splits[self.split_selector]
# here we wrap the pipeline -- if we want to do that
class DataSplitUsage(PipelineTask):
selected_split = output
def band(self):
self.selected_split = DataSplitBand(split_selector="0").selected_split
# and here the regular dbnd pipeline implementation, notice, we are selecting part "1" this time
@pipeline
def data_split_selector():
return DataSplit().splits["1"]
value_type=value_origin.value_type,
source=value_origin.origin_target.source,
path=path,
)
call_kwargs.setdefault("task_is_dynamic", True)
call_kwargs.setdefault(
"task_in_memory_outputs", parent_task.settings.dynamic_task.in_memory_outputs
)
# in case of pipeline - we'd like to run it as regular task
# if False and issubclass(task_cls, PipelineTask):
# # TODO: do we want to support this behavior
# task_cls = task(task_cls._conf__decorator_spec.item).task_cls
if issubclass(task_cls, PipelineTask):
# if it's pipeline - create new databand run
# create override _task_default_result to be object instead of target
task_cls = pipeline(
task_cls._conf__decorator_spec.item, _task_default_result=_default_output
).task_cls
# instantiate inline pipeline
t = task_cls(*call_args, **call_kwargs)
return t
else:
# instantiate inline task
t = task_cls(*call_args, **call_kwargs)
# update upstream/downstream relations - needed for correct tracking
# we can have the task as upstream , as it was executed already
if not parent_task.task_dag.has_upstream(t):