How to use the pyspark.sql.types.StringType function in pyspark

To help you get started, we’ve selected a few pyspark examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github mozilla / python_moztelemetry / tests / test_dataset.py View on Github external
def test_dataframe_bad_schema(dataset, spark):
    spark.catalog.dropTempView('bar')
    schema = StructType([StructField("name", StringType(), True)])
    df = dataset.dataframe(spark, decode=decode, schema=schema, table_name='bar')

    assert type(df) == DataFrame
    assert df.collect() == [Row(name=None), Row(name=None)]
github MrPowers / quinn / tests / test_transformations.py View on Github external
def it_snake_cases_col_names(spark):
        schema = StructType([
            StructField("I like CHEESE", StringType(), True),
            StructField("YUMMMMY stuff", StringType(), True)]
        )
        data = [("jose", "a"), ("li", "b"), ("sam", "c")]
        source_df = spark.createDataFrame(data, schema)
        actual_df = quinn.snake_case_col_names(source_df)
        expected_df = spark.create_df(
            [
                ("jose", "a"),
                ("li", "b"),
                ("sam", "c")
            ],
            [
                ("i_like_cheese", StringType(), True),
                ("yummmmy_stuff", StringType(), True)
            ]
        )
        chispa.assert_df_equality(actual_df, expected_df)
github tubular / sparkly / tests / unit / test_hive_metastore_manager.py View on Github external
def test_struct(self):
        self.assertEqual(
            _type_to_hql(
                StructType([
                    StructField('name', StringType()),
                    StructField('age', DateType())
                ]).jsonValue()
            ),
            'struct<`name`:string,`age`:date>',
        )
github dmwm / CMSSpark / src / python / CMSSpark / schemas.py View on Github external
def schema_dataset_access_types():
    """
    DBS DATASET_ACCESS_TYPES table schema

    DATASET_ACCESS_TYPE_ID NOT NULL NUMBER(38)
    DATASET_ACCESS_TYPE NOT NULL VARCHAR2(100)

    :returns: StructType consisting StructField array
    """
    return StructType([
            StructField("dataset_access_type_id", IntegerType(), True),
            StructField("dataset_access_type", StringType(), True)
        ])
github NYUBigDataProject / SparkClean / sparkclean / df_transformer.py View on Github external
self._assert_cols_in_df(columns_provided=column, columns_df=self._df.columns)

        # Asserting if selected column datatype and search and changeTo parameters are the same:
        col_not_valids = (set(column).difference(set([column for column in valid_cols])))
        assert (col_not_valids == set()), 'Error: The column provided is not a column string: %s' % col_not_valids

        # User defined function to search cell value in list provide by user:
        if isinstance(str_to_replace, str) and list_str is not None:

            def check(cell):
                if cell is not None and (cell in list_str):
                    return str_to_replace
                else:
                    return cell

            func = udf(lambda cell: check(cell), StringType())
        else:
            def replace_from_dic(str_test):
                for key in str_to_replace.keys():
                    if str_test in str_to_replace[key]:
                        str_test = key
                return str_test

            func = udf(lambda cell: replace_from_dic(cell) if cell is not None else cell, StringType())

        # Calling udf for each row of column provided by user. The rest of dataFrame is
        # maintained the same.
        exprs = [func(col(c)).alias(c) if c == column[0] else c for c in self._df.columns]

        self._df = self._df.select(*exprs)

        self._add_transformation()  # checkpoint in case
github dmwm / CMSSpark / src / python / CMSSpark / schemas.py View on Github external
:returns: StructType consisting StructField array
    """
    return StructType([
            StructField("d_dataset_id", IntegerType(), True),
            StructField("d_dataset", StringType(), True),
            StructField("d_is_dataset_valid", IntegerType(), True),
            StructField("d_primary_ds_id", IntegerType(), True),
            StructField("d_processed_ds_id", IntegerType(), True),
            StructField("d_data_tier_id", IntegerType(), True),
            StructField("d_dataset_access_type_id", IntegerType(), True),
            StructField("d_acquisition_era_id", IntegerType(), True),
            StructField("d_processing_era_id", IntegerType(), True),
            StructField("d_physics_group_id", IntegerType(), True),
            StructField("d_xtcrosssection", DoubleType(), True),
            StructField("d_prep_id", StringType(), True),
            StructField("d_creation_date", DoubleType(), True),
            StructField("d_create_by", StringType(), True),
            StructField("d_last_modification_date", DoubleType(), True),
            StructField("d_last_modified_by", StringType(), True)
        ])
github Azure / MachineLearningNotebooks / how-to-use-azureml / training / train-in-spark / train-spark.py View on Github external
# start Spark session
spark = pyspark.sql.SparkSession.builder.appName('Iris').getOrCreate()

# print runtime versions
print('****************')
print('Python version: {}'.format(sys.version))
print('Spark version: {}'.format(spark.version))
print('****************')

# load iris.csv into Spark dataframe
schema = StructType([
    StructField("sepal-length", DoubleType()),
    StructField("sepal-width", DoubleType()),
    StructField("petal-length", DoubleType()),
    StructField("petal-width", DoubleType()),
    StructField("class", StringType())
])

data = spark.read.format("com.databricks.spark.csv") \
    .option("header", "true") \
    .schema(schema) \
    .load("iris.csv")

print("First 10 rows of Iris dataset:")
data.show(10)

# vectorize all numerical columns into a single feature column
feature_cols = data.columns[:-1]
assembler = pyspark.ml.feature.VectorAssembler(
    inputCols=feature_cols, outputCol='features')
data = assembler.transform(data)
github databricks / koalas / databricks / koalas / strings.py View on Github external
Name: 0, dtype: object

        >>> s.str.rjust(width=10)
        0       caribou
        1         tiger
        Name: 0, dtype: object

        >>> s.str.rjust(width=10, fillchar='-')
        0    ---caribou
        1    -----tiger
        Name: 0, dtype: object
        """
        return _wrap_accessor_pandas(
            self,
            lambda x: x.str.rjust(width, fillchar),
            StringType()
        ).alias(self.name)
github openstack / monasca-transform / monasca_transform / transform / transform_utils.py View on Github external
StructField("aggregation_params_map",
                        StructType([StructField("aggregation_period",
                                                StringType(), True),
                                    StructField("dimension_list",
                                                ArrayType(StringType(),
                                                          containsNull=False),
                                                True),
                                    StructField("aggregation_group_by_list",
                                                ArrayType(StringType(),
                                                          containsNull=False),
                                                True),
                                    StructField("usage_fetch_operation",
                                                StringType(),
                                                True),
                                    StructField("filter_by_list",
                                                ArrayType(MapType(StringType(),
                                                                  StringType(),
                                                                  True)
                                                          )
                                                ),
                                    StructField(
                                    "usage_fetch_util_quantity_event_type",
                                    StringType(),
                                    True),

                                    StructField(
                                    "usage_fetch_util_idle_perc_event_type",
                                    StringType(),
                                    True),

                                    StructField("setter_rollup_group_by_list",
                                                ArrayType(StringType(),
github SuperCowPowers / zat / examples / kafka_spark.py View on Github external
.add('qclass', IntegerType()).add('qclass_name', StringType()).add('qtype', IntegerType()) \
                             .add('qtype_name', StringType()).add('rcode', IntegerType()).add('rcode_name', StringType()) \
                             .add('AA', BooleanType()).add('TC', BooleanType()).add('RD', BooleanType()).add('RA', BooleanType()) \
                             .add('Z', IntegerType()).add('answers', StringType()).add('TTLs', StringType()).add('rejected', BooleanType())

    # ETL: Convert raw data into parsed and proper typed data
    parsed_data = raw_data.select(from_json(col('value').cast('string'), dns_schema).alias('data')).select('data.*')

    # FILTER: Only get DNS records that have 'query' field filled out
    filtered_data = parsed_data.filter(parsed_data.query.isNotNull() & (parsed_data.query!='')==True)

    # FILTER 2: Remove Local/mDNS queries
    filtered_data = filtered_data.filter(~filtered_data.query.like('%.local'))  # Note: using the '~' negation operator

    # COMPUTE: A new column with the 2nd level domain extracted from the query
    udf_compute_domain = udf(compute_domain, StringType())
    computed_data = filtered_data.withColumn('domain', udf_compute_domain('query'))

    # AGGREGATE: In this case a simple groupby operation
    group_data = computed_data.groupBy('`id.orig_h`', 'domain', 'qtype_name').count()

    # At any point in the pipeline you can see what you're getting out
    group_data.printSchema()

    # Take the end of our pipeline and pull it into memory
    dns_count_memory_table = group_data.writeStream.format('memory').queryName('dns_counts').outputMode('complete').start()

    # Let the pipeline pull some data
    print('Pulling pipline...Please wait...')

    # Create a Pandas Dataframe by querying the in memory table and converting
    # Loop around every 5 seconds to update output