Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from dbnd import auto_namespace, band, parameter
from dbnd._core.constants import CloudType
from dbnd_test_scenarios.test_common.task.factories import FooConfig, TTask
auto_namespace(scope=__name__)
class FirstTask(TTask):
# exists only for local
foo = parameter(default="FooConfig")[FooConfig]
param = parameter(default="FirstTask.inline.param")[str]
class SecondTask(FirstTask):
defaults = {
FooConfig.bar: "SecondTask.defaults.bar",
FooConfig.quz: "SecondTask.defaults.quz",
}
@band(defaults={FooConfig.bar: "first_pipeline.defaults.bar"})
def first_pipeline():
return SecondTask(param="first_pipeline.band.param").t_output
@band(defaults={FooConfig.quz: "second_pipeline.defaults.quz"})
def test_task_caching(self):
class DummyTask(Task):
x = parameter[str]
dummy_1 = DummyTask(x=1)
dummy_2 = DummyTask(x=2)
dummy_1b = DummyTask(x=1)
assert dummy_1 != dummy_2
assert dummy_1 == dummy_1b
def run(self):
super(TTaskWithMetricsAndInput, self).run()
class FooBaseTask(TTask):
"""
used by all command line checkers
"""
pass
class FooConfig(Config):
bar = parameter(default="from_config")[str]
quz = parameter(default="from_config")[str]
class TConfig(Config):
_conf__task_family = "tconfig"
config_value_s1 = parameter[str]
config_value_s2 = parameter[str]
@task
def ttask_simple(tparam="1"):
# type:(str)->str
return "result %s"
@task
def ttask_dataframe(tparam=1):
def t_f_call(a=parameter[int]):
assert a == 6
def test_params_default_none(self):
class TDefaultNone(TTask):
p_str = parameter(default=None)[str]
p_str_optional = parameter(default=None)[Optional[str]]
target_task = TDefaultNone()
assert target_task.p_str is None
assert target_task.p_str_optional is None
from dbnd import dbnd_run_cmd, parameter
from dbnd_test_scenarios.test_common.task.factories import TTask
class Baz(TTask):
bool = parameter(default=False)[bool]
class BazTrue(TTask):
bool = parameter.value(True)
class TBoolWithDefault(TTask):
x = parameter.value(default=True)
class TestTaskBoolParameters(object):
def test_bool_false_default(self):
result = dbnd_run_cmd(["Baz"])
assert result.task.bool is False
def test_bool_true(self):
class _ApacheBeamInlineTask(_BeamTask, _DecoratedTask):
_conf__require_run_dump_file = True
dataflow_build_pipeline = parameter(
system=True, description="Build Pipeline object if not set"
).value(True)
dataflow_wait_until_finish = parameter(
system=True, description="Automatically wait for pipeline run finish"
).value(True)
dataflow_submit_dbnd_info = parameter(
system=True, description="Add databand data to PipelineOptions"
).value(True)
dataflow_pipeline = parameter(system=True, description="Dataflow Pipeline").value(
None
)[object]
def _task_run(self):
super(_ApacheBeamInlineTask, self)._task_run()
def _task_options(self, python_pipeline_options):
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
from apache_beam.options.pipeline_options import PipelineOptions
user_params = self._params.get_param_values(user_only=True)
class TaskOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
class HdfsPyox(FileSystem, Config):
_conf__task_family = "hdfs_knox"
host = parameter.c(description="HDFS name node URL", default="localhost")[str]
port = parameter.c(description="HDFS name node port", default=50070)[int]
user = parameter.c(default="root")[str]
base = parameter.c(
default=None,
description="Base url for Knox, mutually exclusive with HdfsConfig.host",
)[str]
secure = parameter.c(default=False)[bool]
gateway = parameter.c(default=None)[str]
password = parameter.c(default=None)[str]
cookies = parameter.c(default=None)[str]
bearer_token = parameter.c(default=None)[str]
bearer_token_encode = parameter.c(default=True)[bool]
@staticmethod
def _remove_schema(path):
if path.startswith(HDFS_SCHEMA):
return path[7:]
return path
@property
def client(self): # type ()-> WebHDFS
from pyox import WebHDFS
return WebHDFS(
host=self.host,
port=self.port,
username=self.user,
base=self.base,
from dbnd_docker.container_engine_config import ContainerEngineConfig
from dbnd_docker.docker.docker_task import DockerRunTask
logger = logging.getLogger(__name__)
class DockerEngineConfig(ContainerEngineConfig):
_conf__task_family = "docker"
def get_docker_ctrl(self, task_run):
from dbnd_docker.docker.docker_task_run_ctrl import LocalDockerRunCtrl
return LocalDockerRunCtrl(task_run=task_run)
network = parameter(default=None, description="Docker Network to connect to")[str]
api_version = parameter(
description="Remote API version. "
"Set to ``auto`` to automatically detect the server's version.,"
)[str]
docker_url = parameter(
description="URL of the host running the docker daemon."
).value("unix://var/run/docker.sock")
auto_remove = parameter(
description="Auto-removal of the container on daemon side when the container's process exits."
).value(False)
force_pull = parameter(
description="Pull the docker image on every run. Default is False."
).value(False)
from dbnd_spark.spark_config import SparkEngineConfig
logger = logging.getLogger(__name__)
class Qubole(object):
aws = "aws"
class QuboleConfig(SparkEngineConfig):
"""Databricks cloud for Apache Spark """
_conf__task_family = "qubole"
cluster_type = SparkClusters.qubole
cloud = parameter(default="AWS", description="cloud")
api_url = parameter(default="https://us.qubole.com/api").help(
"API URL without version. like:'https://.qubole.com/api'"
)[str]
ui_url = parameter(default="https://api.qubole.com").help(
"UI URL for accessing Qubole logs"
)[str]
api_token = parameter.help("API key of qubole account")[str]
cluster_label = parameter().help("the label of the cluster to run the command on")[
str
]
status_polling_interval_seconds = parameter(default=10).help(
"seconds to sleep between polling databricks for job status."