Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
for i in range(len(z)):
u=u+float(z[i])
if(u>1):
return float(u)
else:
return 1.0
if __name__ == "__main__":
if len(sys.argv) != 4:
print("Usage: pagerank ", file=sys.stderr)
sys.exit(-1)
# Initialize the spark context.
spark = SparkSession\
.builder\
.appName("PythonPageRank")\
.getOrCreate()
lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
links = lines.map(lambda batsman: parseNeighbors(batsman)).distinct().groupByKey().cache()
links1=lines.map(lambda batsman: parseNeighbors1(batsman)).distinct().groupByKey().cache()
link=links.map(lambda x:(x[0],float(sum1(list(x[1])))))
#print(links.collect())
ranks = link.map(lambda url_neighbors: (url_neighbors[0],url_neighbors[1]))
#print(ranks.collect())
iterations=0
return key,1
def parseNeighbors(urls):
parts = re.split(r',', urls)
return parts[0],int(parts[2])/int(parts[3])
def parseNeigbors1(urls):
parts = re.split(r',',urls)
return parts[0],parts[1]
if __name__ == "__main__":
if len(sys.argv) != 4:
print("Usage: pagerank ", file=sys.stderr)
sys.exit(-1)
# Initialize the spark context.
spark = SparkSession\
.builder\
.appName("PythonPageRank")\
.getOrCreate()
some_value = float((float(sys.argv[3]))/100)
if (some_value == 0):
some_value = 0.8
lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
links2 = lines.map(lambda urls: parseNeighbors(urls)).groupByKey().mapValues(sum).cache()
ranks=links2.map(lambda x:compute(x[0],x[1]))
prevranks=links2.map(lambda x:compute(x[0],x[1]))
links1=lines.map(lambda urls: parseNeigbors1(urls)).groupByKey().cache()
count_value = 0
count = 0
t = True
parts = re.split(r',+', urls)
return parts[0],parts[1]
if __name__ == "__main__":
if len(sys.argv) != 4:
print("Usage: pagerank ", file=sys.stderr)
sys.exit(-1)
if(int(sys.argv[3])==0):
first=0.80
second=0.20
if(int(sys.argv[3])>0):
first=sys.argv[3]*0.01
second=1-first
# Initialize the spark context.
spark = SparkSession\
.builder\
.appName("PythonPageRank")\
.getOrCreate()
lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
#print(lines.collect())
links = lines.map(lambda urls: parseNeighbors(urls)).distinct().groupByKey().cache()
#print(links.collect())
inter_ranks = lines.map(lambda urls: avg(urls)).distinct().reduceByKey(add)
ranks=inter_ranks.map(lambda x:(x[0],max(x[1],1))).sortBy(lambda x:(x[1],x[0]),False)
#t = ranks.collect()
#print(ranks)
#print("RANKS:",ranks.collect())
def test_spark_dataframe_output_csv():
spark = SparkSession.builder.getOrCreate()
num_df = (
spark.read.format('csv')
.options(header='true', inferSchema='true')
.load(file_relative_path(__file__, 'num.csv'))
)
assert num_df.collect() == [Row(num1=1, num2=2)]
@solid
def emit(_):
return num_df
@solid(input_defs=[InputDefinition('df', DataFrame)], output_defs=[OutputDefinition(DataFrame)])
def passthrough_df(_context, df):
return df
@pipeline
def passthrough():
passthrough_df(emit())
with seven.TemporaryDirectory() as tempdir:
file_name = os.path.join(tempdir, 'output.csv')
result = execute_pipeline(
def test_start_sentry_listener():
spark_context = SparkContext.getOrCreate()
gateway = spark_context._gateway
assert gateway._callback_server is None
_start_sentry_listener(spark_context)
assert gateway._callback_server is not None
from random import random
from operator import add
from pyspark.sql import SparkSession
import PySparkTestInclude
if __name__ == "__main__":
"""
Usage: pi [partitions]
"""
# Make sure we can include this user-provided module
PySparkTestInclude.func()
spark = SparkSession\
.builder\
.appName("PythonPi")\
.getOrCreate()
partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
n = 100000 * partitions
def f(_):
x = random() * 2 - 1
y = random() * 2 - 1
return 1 if x ** 2 + y ** 2 < 1 else 0
count = spark.sparkContext.parallelize(range(1, n + 1), partitions).map(f).reduce(add)
print("Pi is roughly %f" % (4.0 * count / n))
spark.stop()
def spark(request):
"""
Creates a spark context
Parameters
----------
request: pytest.FixtureRequest object
provides access to testing context
"""
spark = (
SparkSession
.builder
.appName('pytest-pyspark-local-testing')
.master('local[2]')
.getOrCreate()
)
request.addfinalizer(lambda: spark.stop())
return spark
# * Create python SparkContext using the SparkConf (so we can specify the warehouse.dir)
# * Create Scala side HiveTestContext SparkSession
# * Create python SparkSession
jgw = launch_gateway(None)
jvm = jgw.jvm
import tempfile
import getpass
hivedir = "file://{0}/{1}/smv_hive_test".format(tempfile.gettempdir(), getpass.getuser())
sConf = SparkConf(False, _jvm=jvm).set("spark.sql.test", "")\
.set("spark.sql.hive.metastore.barrierPrefixes",
"org.apache.spark.sql.hive.execution.PairSerDe")\
.set("spark.sql.warehouse.dir", hivedir)\
.set("spark.ui.enabled", "false")
sc = SparkContext(master="local[1]", appName="SMV Python Test", conf=sConf, gateway=jgw).getOrCreate()
jss = sc._jvm.org.apache.spark.sql.hive.test.SmvTestHive.createContext(sc._jsc.sc())
cls.spark = SparkSession(sc, jss.sparkSession())
return cls.spark
def session():
yield Session(spark_session=SparkSession.builder.appName("AWS Wrangler Test").getOrCreate())
def main():
# Args
mol_pattern = '/home/emountjoy_statgen/data/sumstats/molecular_trait/*.parquet'
out_dir = '/home/emountjoy_statgen/data/sumstats/molecular_trait_2/'
# mol_pattern = '/Users/em21/Projects/genetics-finemapping/example_data/sumstats/molecular_trait/*.parquet'
# out_dir = '/Users/em21/Projects/genetics-finemapping/example_data/sumstats/molecular_trait_2/'
# Make spark session
spark = (
pyspark.sql.SparkSession.builder
.config("parquet.enable.summary-metadata", "true")
.getOrCreate()
)
print('Spark version: ', spark.version)
# Process each
for inf in glob(mol_pattern):
# Load
df = spark.read.parquet(inf)
# Write
outf = os.path.join(out_dir, os.path.basename(inf))
(
df
.write