How to use pyspark - 10 common examples

To help you get started, we’ve selected a few pyspark examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github IamMayankThakur / test-bigdata / adminmgr / media / code / A2 / python / task / BD_1621_1634_1906_U2kyAzB.py View on Github external
for i in range(len(z)):
      u=u+float(z[i])
   if(u>1):
       return float(u)
   else:
       return 1.0
       


if __name__ == "__main__":
        if len(sys.argv) != 4:
	        print("Usage: pagerank  ", file=sys.stderr)
                sys.exit(-1)

	# Initialize the spark context.
        spark = SparkSession\
        .builder\
        .appName("PythonPageRank")\
        .getOrCreate()

        lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])

        links = lines.map(lambda batsman: parseNeighbors(batsman)).distinct().groupByKey().cache()
        
        links1=lines.map(lambda batsman: parseNeighbors1(batsman)).distinct().groupByKey().cache()
        
        
	link=links.map(lambda x:(x[0],float(sum1(list(x[1])))))
        #print(links.collect())
        ranks = link.map(lambda url_neighbors: (url_neighbors[0],url_neighbors[1]))
        #print(ranks.collect())
        iterations=0
github IamMayankThakur / test-bigdata / adminmgr / media / code / A2 / python / task / BD_188_1000_1767.py View on Github external
return key,1

def parseNeighbors(urls):
    parts = re.split(r',', urls)
    return parts[0],int(parts[2])/int(parts[3])

def parseNeigbors1(urls):
    parts = re.split(r',',urls)
    return parts[0],parts[1]

if __name__ == "__main__":
    if len(sys.argv) != 4:
        print("Usage: pagerank  ", file=sys.stderr)
        sys.exit(-1)
    # Initialize the spark context.
    spark = SparkSession\
        .builder\
        .appName("PythonPageRank")\
        .getOrCreate()
    some_value = float((float(sys.argv[3]))/100)
    if (some_value == 0):
		some_value = 0.8
    lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
    links2 = lines.map(lambda urls: parseNeighbors(urls)).groupByKey().mapValues(sum).cache()
    ranks=links2.map(lambda x:compute(x[0],x[1]))
    prevranks=links2.map(lambda x:compute(x[0],x[1]))
    links1=lines.map(lambda urls: parseNeigbors1(urls)).groupByKey().cache()
    
    count_value = 0
    count = 0
    t = True
github IamMayankThakur / test-bigdata / adminmgr / media / code / A2 / python / task / BD_94_155_1509.py View on Github external
parts = re.split(r',+', urls)
    return parts[0],parts[1]
    

if __name__ == "__main__":
    if len(sys.argv) != 4:
        print("Usage: pagerank   ", file=sys.stderr)
        sys.exit(-1)
    if(int(sys.argv[3])==0):
        first=0.80
        second=0.20
    if(int(sys.argv[3])>0):
        first=sys.argv[3]*0.01
        second=1-first	
    # Initialize the spark context.
    spark = SparkSession\
        .builder\
        .appName("PythonPageRank")\
        .getOrCreate()
    
    lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
    #print(lines.collect())
    
    links = lines.map(lambda urls: parseNeighbors(urls)).distinct().groupByKey().cache()
    #print(links.collect())
    inter_ranks = lines.map(lambda urls: avg(urls)).distinct().reduceByKey(add)
    ranks=inter_ranks.map(lambda x:(x[0],max(x[1],1))).sortBy(lambda x:(x[1],x[0]),False)
    #t = ranks.collect()
    #print(ranks)
    

    #print("RANKS:",ranks.collect())
github dagster-io / dagster / examples / dagster_examples_tests / airline_demo_tests / test_types.py View on Github external
def test_spark_dataframe_output_csv():
    spark = SparkSession.builder.getOrCreate()
    num_df = (
        spark.read.format('csv')
        .options(header='true', inferSchema='true')
        .load(file_relative_path(__file__, 'num.csv'))
    )

    assert num_df.collect() == [Row(num1=1, num2=2)]

    @solid
    def emit(_):
        return num_df

    @solid(input_defs=[InputDefinition('df', DataFrame)], output_defs=[OutputDefinition(DataFrame)])
    def passthrough_df(_context, df):
        return df

    @pipeline
    def passthrough():
        passthrough_df(emit())

    with seven.TemporaryDirectory() as tempdir:
        file_name = os.path.join(tempdir, 'output.csv')
        result = execute_pipeline(
github getsentry / sentry-python / tests / integrations / spark / test_spark.py View on Github external
def test_start_sentry_listener():
    spark_context = SparkContext.getOrCreate()

    gateway = spark_context._gateway
    assert gateway._callback_server is None

    _start_sentry_listener(spark_context)

    assert gateway._callback_server is not None
github mesosphere / spark-build / tests / jobs / python / pi_with_include.py View on Github external
from random import random
from operator import add

from pyspark.sql import SparkSession

import PySparkTestInclude

if __name__ == "__main__":
    """
        Usage: pi [partitions]
    """

    # Make sure we can include this user-provided module
    PySparkTestInclude.func()

    spark = SparkSession\
        .builder\
        .appName("PythonPi")\
        .getOrCreate()

    partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
    n = 100000 * partitions

    def f(_):
        x = random() * 2 - 1
        y = random() * 2 - 1
        return 1 if x ** 2 + y ** 2 < 1 else 0

    count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
    print("Pi is roughly %f" % (4.0 * count / n))

    spark.stop()
github htorrence / pytest_examples / tests / fixtures.py View on Github external
def spark(request):
    """
    Creates a spark context

    Parameters
    ----------
    request: pytest.FixtureRequest object
        provides access to testing context
    """

    spark = (
        SparkSession
        .builder
        .appName('pytest-pyspark-local-testing')
        .master('local[2]')
        .getOrCreate()
    )

    request.addfinalizer(lambda: spark.stop())

    return spark
github TresAmigosSD / SMV / src / main / python / test_support / testconfig.py View on Github external
#   * Create python SparkContext using the SparkConf (so we can specify the warehouse.dir)
            #   * Create Scala side HiveTestContext SparkSession
            #   * Create python SparkSession
            jgw = launch_gateway(None)
            jvm = jgw.jvm
            import tempfile
            import getpass
            hivedir = "file://{0}/{1}/smv_hive_test".format(tempfile.gettempdir(), getpass.getuser())
            sConf = SparkConf(False, _jvm=jvm).set("spark.sql.test", "")\
                                              .set("spark.sql.hive.metastore.barrierPrefixes",
                                                   "org.apache.spark.sql.hive.execution.PairSerDe")\
                                              .set("spark.sql.warehouse.dir", hivedir)\
                                              .set("spark.ui.enabled", "false")
            sc = SparkContext(master="local[1]", appName="SMV Python Test", conf=sConf, gateway=jgw).getOrCreate()
            jss = sc._jvm.org.apache.spark.sql.hive.test.SmvTestHive.createContext(sc._jsc.sc())
            cls.spark = SparkSession(sc, jss.sparkSession())
        return cls.spark
github awslabs / aws-data-wrangler / testing / test_awswrangler / test_spark.py View on Github external
def session():
    yield Session(spark_session=SparkSession.builder.appName("AWS Wrangler Test").getOrCreate())
github opentargets / genetics-finemapping / tests / split_qtl / split_qtl.py View on Github external
def main():

    # Args
    mol_pattern = '/home/emountjoy_statgen/data/sumstats/molecular_trait/*.parquet'
    out_dir = '/home/emountjoy_statgen/data/sumstats/molecular_trait_2/'
    # mol_pattern = '/Users/em21/Projects/genetics-finemapping/example_data/sumstats/molecular_trait/*.parquet'
    # out_dir = '/Users/em21/Projects/genetics-finemapping/example_data/sumstats/molecular_trait_2/'

    # Make spark session
    spark = (
        pyspark.sql.SparkSession.builder
        .config("parquet.enable.summary-metadata", "true")
        .getOrCreate()
    )
    print('Spark version: ', spark.version)

    # Process each
    for inf in glob(mol_pattern):

        # Load
        df = spark.read.parquet(inf)

        # Write
        outf = os.path.join(out_dir, os.path.basename(inf))
        (
            df
            .write