Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def rebuild_for_store(asset: PySparkDataAsset, airflow_context):
spark_session = pyspark.sql.SparkSession.builder.getOrCreate()
enrollment_data = PySparkDataAssetIO.read_data_asset(
asset=asset,
source_files=asset.pickedup_files(airflow_context),
spark_session=spark_session,
header=True,
inferSchema=True,
)
PySparkDataAssetIO.write_data_asset(asset=asset, data=enrollment_data)
spark_session.stop()
def _read_parquet_spark(self, hash_list):
from pyspark import sql as sparksql
spark = sparksql.SparkSession.builder.getOrCreate()
objfiles = [self.object_path(h) for h in hash_list]
dataframe = spark.read.parquet(*objfiles)
return dataframe
def detach_spark_logger(self):
if os.getenv("DBND__LOG_SPARK"):
spark_log_file = self.local_spark_log_file
try:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
jvm = spark._jvm
log4j = jvm.org.apache.log4j
spark_logger = log4j.Logger.getLogger("org.apache.spark")
spark_logger.removeAppender(spark_log_file.path)
except Exception as task_ex:
logger.error(
"Failed to detach spark logger for log %s: %s",
spark_log_file,
task_ex,
)
# coding: utf-8
# A quick walkthrough of wrangling a df in HoloClean
# In[23]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
data = spark.read.csv("../../../datasets/hospital1k/groundtruth.csv", header=True, encoding='utf-8')
out_path ="../../../datasets/hospital1k/groundtruth_norm.csv"
# Take note of the large number of variants on 'Chicago' in this dataset. Our wrangler attempts to merge these values into one.
# In[24]:
from wrangler import Wrangler
wrangler = Wrangler()
def _file_to_spark_data_frame(ext, path, handler_args):
from pyspark import sql as sparksql
ext = ext.lower() # ensure that case doesn't matter
logic = PANDAS_PARSERS.get(ext)
kwargs = dict(logic['kwargs'])
kwargs.update(handler_args)
spark = sparksql.SparkSession.builder.getOrCreate()
dataframe = None
reader = None
# FIXME: Add json support?
if logic['attr'] == "read_csv":
sep = kwargs.get('sep')
reader = spark.read.format("csv").option("header", "true")
if sep:
reader = reader.option("delimiter", sep)
dataframe = reader.load(path)
for col in dataframe.columns:
pcol = to_identifier(col)
if col != pcol:
dataframe = dataframe.withColumnRenamed(col, pcol)
else:
dataframe = _file_to_data_frame(ext, path, handler_args)
# Setting class variables
if data is not None:
self.data = data
if inputCol is not None:
self.inputCol = inputCol
if outputCol is not None:
self.outputCol = outputCol
if self.data is None:
raise ValueError("Class variable data is not defined, please pass\
in a dataframe into the data parameter")
session = SparkSession.builder.getOrCreate()
AMINO_ACIDS21 = self.AMINO_ACIDS21
# Encoder function to be passed as User Defined Function (UDF)
def _encoder(s):
values = [0] * len(AMINO_ACIDS21) * len(s)
for i in range(len(s)):
if s[i] in AMINO_ACIDS21:
index = AMINO_ACIDS21.index(s[i])
else:
index = AMINO_ACIDS21.index('X')
values[i*len(AMINO_ACIDS21) + index] = 1
def setModel(self, modelSchema):
session = SparkSession.builder.getOrCreate()
self._java_obj = self._java_obj.setModel(modelSchema.toJava(session))
return self
# find all interactions
row = structures.flatMap(PolymerInteractionFingerprint(interactionFilter))
# convert RDD to a Dataset with the following columns
nullable = False
fields = [StructField("structureChainId", StringType(), nullable),
StructField("queryChainId", StringType(), nullable),
StructField("targetChainId", StringType(), nullable),
StructField("groupNumbers", ArrayType(StringType(),nullable), nullable),
StructField("sequenceIndices", ArrayType(IntegerType(), nullable), nullable),
StructField("sequence", StringType(), nullable),
]
schema = StructType(fields)
spark = SparkSession.builder.getOrCreate()
return spark.createDataFrame(row, schema)
parameters are returned in each row.
Parameters
----------
structures : PythonRDD
a set of PDB structures
interactionFilter : InteractionFilter
filter criteria for determing noncovalent interactions
Returns
-------
dataset
Dataset of pairwise interactions
'''
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
# calculate interactions
pairwise = False
rows = structures.flatMap(StructureToAtomInteractions(
sc.broadcast(interactionFilter), pairwise))
# convert PythonRDD to Dataset
return spark.createDataFrame(rows, AtomInteraction().get_schema(interactionFilter.get_max_interactions()))