How to use the pyflink.table.StreamTableEnvironment.create function in pyflink

To help you get started, we’ve selected a few pyflink examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github apache / flink / flink-python / pyflink / testing / test_case_utils.py View on Github external
def setUp(self):
        super(PyFlinkStreamTableTestCase, self).setUp()
        self.env = StreamExecutionEnvironment.get_execution_environment()
        self.env.set_parallelism(2)
        self.t_env = StreamTableEnvironment.create(self.env)
github alibaba / flink-ai-extended / flink-ml-tensorflow / python / flink_ml_tensorflow / tensorflow_on_flink_table_examples.py View on Github external
def inputOutputTable():
        stream_env = StreamExecutionEnvironment.get_execution_environment()
        table_env = StreamTableEnvironment.create(stream_env)
        work_num = 2
        ps_num = 1
        python_file = os.getcwd() + "/../../src/test/python/input_output.py"
        property = {}
        func = "map_func"
        env_path = None
        zk_conn = None
        zk_base_path = None
        property[MLCONSTANTS.ENCODING_CLASS] = "com.alibaba.flink.ml.operator.coding.RowCSVCoding"
        property[MLCONSTANTS.DECODING_CLASS] = "com.alibaba.flink.ml.operator.coding.RowCSVCoding"
        inputSb = "INT_32" + "," + "INT_64" + "," + "FLOAT_32" + "," + "FLOAT_64" + "," + "STRING"
        property["SYS:csv_encode_types"] = inputSb
        property["SYS:csv_decode_types"] = inputSb
        source_file = os.getcwd() + "/../../src/test/resources/input.csv"
        table_source = CsvTableSource(source_file,
                                      ["a", "b", "c", "d", "e"],
github alibaba / flink-ai-extended / flink-ml-tensorflow / python / flink_ml_tensorflow / tensorflow_on_flink_table_examples.py View on Github external
def addTrainChiefAloneTable():
        stream_env = StreamExecutionEnvironment.get_execution_environment()
        table_env = StreamTableEnvironment.create(stream_env)
        work_num = 2
        ps_num = 1
        python_file = os.getcwd() + "/../../src/test/python/add.py"
        func = "map_func"
        property = {}
        property[TFCONSTANS.TF_IS_CHIEF_ALONE] = "ture"
        env_path = None
        zk_conn = None
        zk_base_path = None
        input_tb = None
        output_schema = None

        train(work_num, ps_num, python_file, func, property, env_path, zk_conn, zk_base_path, stream_env, table_env,
              input_tb, output_schema)
        # inference(work_num, ps_num, python_file, func, property, env_path, zk_conn, zk_base_path, stream_env, table_env,