Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setUp(self):
super(SetAggregatedMetricNameTest, self).setUp()
self.sql_context = SQLContext(self.spark_context)
model = rf.fit(train)
predictionAndLabels = model.transform(test).select("prediction", "indexedLabel") \
.map(lambda x: (x.prediction, x.indexedLabel))
metrics = RegressionMetrics(predictionAndLabels)
print("rmse %.3f" % metrics.rootMeanSquaredError)
print("r2 %.3f" % metrics.r2)
print("mae %.3f" % metrics.meanAbsoluteError)
if __name__ == "__main__":
if len(sys.argv) > 1:
print("Usage: gradient_boosted_trees", file=sys.stderr)
exit(1)
sc = SparkContext(appName="PythonGBTExample")
sqlContext = SQLContext(sc)
# Load the data stored in LIBSVM format as a DataFrame.
df = sqlContext.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
# Map labels into an indexed column of labels in [0, numLabels)
stringIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel")
si_model = stringIndexer.fit(df)
td = si_model.transform(df)
[train, test] = td.randomSplit([0.7, 0.3])
testClassification(train, test)
testRegression(train, test)
sc.stop()
"event_timestamp_unix": "count"}
# do a group by
grouped_data = record_store_df_int.groupBy(*group_by_columns_list)
grouped_record_store_df = grouped_data.agg(agg_operations_map)
grouped_data_rdd_with_operation = grouped_record_store_df.rdd.map(
lambda x:
GroupedDataNamedTuple(x,
str(usage_fetch_operation),
group_by_columns_list))
instance_usage_json_rdd = grouped_data_rdd_with_operation.map(
FetchQuantity._get_quantity)
sql_context = SQLContext.getOrCreate(record_store_df.rdd.context)
instance_usage_df = \
InstanceUsageUtils.create_df_from_json_rdd(sql_context,
instance_usage_json_rdd)
return instance_usage_df
@classmethod
def __connected_spark_cluster(self, resource_url, pilot_description=None):
conf = SparkConf()
conf.setAppName("Pilot-Spark")
if pilot_description!=None:
for i in pilot_description.keys():
if i.startswith("spark"):
conf.set(i, pilot_description[i])
conf.setMaster(resource_url)
print(conf.toDebugString())
sc = SparkContext(conf=conf)
sqlCtx = SQLContext(sc)
pilot = PilotCompute(spark_context=sc, spark_sql_context=sqlCtx)
return pilot
def getSqlContextInstance(sparkContext):
if ('sqlContextSingletonInstance' not in globals()):
globals()['sqlContextSingletonInstance'] = SQLContext(sc)
return globals()['sqlContextSingletonInstance']
def to_data_frame(sc, features, labels, categorical=False):
"""Convert numpy arrays of features and labels into Spark DataFrame
"""
lp_rdd = to_labeled_point(sc, features, labels, categorical)
sql_context = SQLContext(sc)
df = sql_context.createDataFrame(lp_rdd)
return df
parse_grant_rdd = path_rdd.flatMap(lambda x: pp.parse_medline_grant_id(x))\
.filter(lambda x: x is not None)\
.map(lambda x: Row(**x))
grant_df = parse_grant_rdd.toDF()
grant_df.write.parquet(os.path.join(save_dir, 'medline_grant_%s.parquet' % date_update_str),
mode='overwrite')
conf = SparkConf().setAppName('medline_spark')\
.setMaster('local[8]')\
.set('executor.memory', '8g')\
.set('driver.memory', '8g')\
.set('spark.driver.maxResultSize', '0')
if __name__ == '__main__':
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
is_update, date_update = update()
if is_update or not glob(os.path.join(save_dir, 'medline_*.parquet')):
process_file(date_update)
sc.stop()
verbose = opts.verbose
yarn = opts.yarn
inst = opts.inst
date = opts.date
fout = opts.fout
if inst.lower() in ['global', 'phys01', 'phys02', 'phys03']:
inst = inst.upper()
else:
raise Exception('Unsupported DBS instance "%s"' % inst)
# Create spark context
ctx = spark_context('cms', yarn, verbose)
# Create SQL context to be used for SQL queries
sql_context = SQLContext(ctx)
# Initialize DBS tables (will be used with AAA, CMSSW)
dbs_tables(sql_context, inst=inst, verbose=verbose)
aaa_start_time = time.time()
run_aaa(date, fout, ctx, sql_context, verbose)
aaa_elapsed_time = elapsed_time(aaa_start_time)
cmssw_start_time = time.time()
run_cmssw(date, fout, ctx, sql_context, verbose)
cmssw_elapsed_time = elapsed_time(cmssw_start_time)
eos_start_time = time.time()
def run(fout, yarn=None, verbose=None, patterns=None, antipatterns=None, inst='GLOBAL'):
"""
Main function to run pyspark job. It requires a schema file, an HDFS directory
with data and optional script with mapper/reducer functions.
"""
# define spark context, it's main object which allow to communicate with spark
ctx = spark_context('cms', yarn, verbose)
sqlContext = SQLContext(ctx)
# read DBS and Phedex tables
tables = {}
tables.update(dbs_tables(sqlContext, inst=inst, verbose=verbose))
tables.update(phedex_tables(sqlContext, verbose=verbose))
phedex_df = tables['phedex_df']
daf = tables['daf']
ddf = tables['ddf']
bdf = tables['bdf']
fdf = tables['fdf']
aef = tables['aef']
pef = tables['pef']
mcf = tables['mcf']
ocf = tables['ocf']
rvf = tables['rvf']
def toDF(self):
"""
Converts this GenomicDataset into a DataFrame.
:return: Returns a dataframe representing this genomic dataset.
"""
return DataFrame(self._jvmRdd.toDF(), SQLContext(self.sc))