How to use the kedro.pipeline.Pipeline function in kedro

To help you get started, weā€™ve selected a few kedro examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github quantumblacklabs / kedro / tests / pipeline / test_pipeline.py View on Github external
def test_names_only(self, str_node_inputs_list):
        pipeline = Pipeline(str_node_inputs_list["nodes"])
        description = pipeline.describe()

        desc = description.split("\n")
        test_desc = [
            "#### Pipeline execution order ####",
            "Name: None",
            "Inputs: input1, input2",
            "",
            "node1",
            "node2",
            "",
            "Outputs: input4",
            "##################################",
        ]

        assert len(desc) == len(test_desc)
github quantumblacklabs / kedro / tests / pipeline / test_pipeline.py View on Github external
def test_remove_from_empty_pipeline(self):
        """Remove node from an empty pipeline"""
        pipeline1 = Pipeline([node(biconcat, ["input", "input1"], "output1", name="a")])
        pipeline2 = Pipeline([])
        new_pipeline = pipeline2 - pipeline1
        assert new_pipeline.inputs() == pipeline2.inputs()
        assert new_pipeline.outputs() == pipeline2.outputs()
        assert not new_pipeline.nodes
github quantumblacklabs / kedro / tests / pipeline / test_pipeline.py View on Github external
def test_invalid_union(self):
        p = Pipeline([])
        pattern = r"unsupported operand type\(s\) for |: 'Pipeline' and 'str'"
        with pytest.raises(TypeError, match=pattern):
            p | "hello"  # pylint: disable=pointless-statement
github quantumblacklabs / kedro / tests / pipeline / test_pipeline.py View on Github external
def test_tag_existing_pipeline(self, branchless_pipeline):
        pipeline = Pipeline(branchless_pipeline["nodes"])
        pipeline = pipeline.tag(["new_tag"])
        assert all("new_tag" in n.tags for n in pipeline.nodes)
github quantumblacklabs / kedro / tests / runner / test_parallel_runner.py View on Github external
def test_memory_data_set_input(self, fan_out_fan_in):
        pipeline = Pipeline([fan_out_fan_in])
        catalog = DataCatalog({"A": MemoryDataSet("42")})
        result = ParallelRunner().run(pipeline, catalog)
        assert "Z" in result
        assert len(result["Z"]) == 3
        assert result["Z"] == ("42", "42", "42")
github quantumblacklabs / kedro / tests / runner / test_parallel_runner.py View on Github external
def decorated_fan_out_fan_in():
    return Pipeline(
        [
            node(decorated_identity, "A", "B"),
            node(decorated_identity, "B", "C"),
            node(decorated_identity, "B", "D"),
            node(decorated_identity, "B", "E"),
            node(fan_in, ["C", "D", "E"], "Z"),
        ]
github quantumblacklabs / kedro / tests / pipeline / test_pipeline.py View on Github external
def test_initialized_with_tags(self):
        pipeline = Pipeline(
            [node(identity, "A", "B", tags=["node1", "p1"]), node(identity, "B", "C")],
            tags=["p1", "p2"],
        )

        node1 = pipeline.grouped_nodes[0].pop()
        node2 = pipeline.grouped_nodes[1].pop()
        assert node1.tags == {"node1", "p1", "p2"}
        assert node2.tags == {"p1", "p2"}
github quantumblacklabs / kedro / tests / context / test_context.py View on Github external
def _get_pipelines(self) -> Dict[str, Pipeline]:
                return {"__default__": Pipeline([])}
github quantumblacklabs / kedro / tests / pipeline / test_pipeline.py View on Github external
def test_remove_all_nodes(self):
        """Remove an entire pipeline"""
        pipeline1 = Pipeline([node(biconcat, ["input", "input1"], "output1", name="a")])
        pipeline2 = Pipeline([node(biconcat, ["input", "input1"], "output1", name="a")])
        new_pipeline = pipeline1 - pipeline2
        assert new_pipeline.inputs() == set()
        assert new_pipeline.outputs() == set()
        assert not new_pipeline.nodes
github quantumblacklabs / kedro / tests / pipeline / test_pipeline.py View on Github external
def test_empty_apply(self):
        """Applying no decorators is valid."""
        identity_node = node(identity, "number", "output", name="identity")
        pipeline = Pipeline([identity_node]).decorate()
        catalog = DataCatalog({}, dict(number=1))
        result = SequentialRunner().run(pipeline, catalog)
        assert result["output"] == 1