Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def schema_file_lumis():
"""DBS FILE_LUMIS table schema
:returns: StructType consisting StructField array
"""
return StructType([
StructField("fl_run_num", IntegerType(), True),
StructField("fl_lumi_section_num", StringType(), True),
StructField("fl_file_id", IntegerType(), True)
])
METASTORE_PARTITION_SCHEMA = \
StructType([
StructField('database', StringType(), False),
StructField('table', StringType(), False),
StructField('item', DATACATALOG_PARTITION_ITEM_SCHEMA, True),
StructField('type', StringType(), False)
])
METASTORE_DATABASE_SCHEMA = \
StructType([
StructField('item', DATACATALOG_DATABASE_ITEM_SCHEMA, True),
StructField('type', StringType(), False)
])
METASTORE_TABLE_SCHEMA = \
StructType([
StructField('database', StringType(), False),
StructField('type', StringType(), False),
StructField('item', DATACATALOG_TABLE_ITEM_SCHEMA, True)
])
def append(l, elem):
"""Append list with element and return the list modified"""
if elem is not None:
l.append(elem)
return l
def extend(l1, l2):
"""Extend l1 with l2 and return l1 modified"""
l1.extend(l2)
def simplify_datawriter_paths(j_paths: Any) -> List[Row]:
rows = []
for fold_id, j_fold in enumerate(j_paths):
for split_name, file_path in dict(j_fold).items():
rows.append(Row(
vec_format='svmrank',
split_name=split_name,
path=file_path,
fold_id=fold_id))
return rows
# wikiid doesn't exist at this level, the dataframes already
# represent a single wiki and that is added back in later.
TrainingFilesNoWiki = T.StructType([
field for field in mt.TrainingFiles if field.name != "wikiid"])
def convert_mllib_to_svmrank(
path_format: str, fold_col: Optional[str], num_folds: int
) -> mt.Transformer:
if fold_col is None and num_folds != 1:
raise Exception('num_folds must be 1 when fold_col is None, got: {}'.format(num_folds))
def transform(df: DataFrame) -> DataFrame:
sc = df.sql_ctx.sparkSession.sparkContext
jsc, jvm = sc._jsc, sc._jvm # type: ignore
writer = jvm.org.wikimedia.search.mjolnir.DataWriter(jsc, False)
jdf = df._jdf # type: ignore
on=[ms_skewed_col_value_loc_map['STRING_LIST_ID_KID'] == skewed_value_str['STRING_LIST_ID']],
how='inner')
# columns: (SD_ID: BigInt, skewedColumnValueLocationMaps: Map[String, String])
skewed_column_value_location_maps = self.kv_pair_to_map(df=skewed_value_str_with_loc,
id_col='SD_ID',
key='skewedColumnValuesStr',
value='LOCATION',
map_col_name='skewedColumnValueLocationMaps')
# columns: (SD_ID: BigInt, skewedColumnValues: List[String])
skewed_column_values = self.sql_context.createDataFrame(
data=skewed_value_str_with_loc.rdd.map(
lambda row: (row['SD_ID'], row['skewedColumnValues'])
).aggregateByKey([], append, extend),
schema=StructType([
StructField(name='SD_ID', dataType=LongType()),
StructField(name='skewedColumnValues', dataType=ArrayType(elementType=StringType()))
]))
return skewed_column_values, skewed_column_value_location_maps
def transform_ms_columns(self, ms_columns):
return self.transform_df_with_idx(df=ms_columns,
id_col='CD_ID',
idx='INTEGER_IDX',
payloads_column_name='columns',
payload_type=StructType([
StructField(name='name', dataType=StringType()),
StructField(name='type', dataType=StringType()),
StructField(name='comment', dataType=StringType())]),
payload_func=lambda row: (
row['COLUMN_NAME'], row['TYPE_NAME'], row['COMMENT']))
def makeSchema(self, columns):
struct_field_map = {'string': StringType(),
'date': TimestampType(),
'double': DoubleType(),
'int': IntegerType(),
'none': NullType()}
fields = [StructField(k, struct_field_map[v], True) for k, v in columns]
return StructType(fields)
StructField("aggregated_metric_name",
StringType(), True),
StructField("pre_hourly_group_by_list",
ArrayType(StringType(),
containsNull=False),
True),
StructField("pre_hourly_operation",
StringType(), True),
StructField("aggregation_pipeline",
StructType([source, usage,
setters, insert]),
True)
]), True)
metric_id = StructField("metric_id", StringType(), True)
schema = StructType([aggregation_params_map, metric_id])
return schema