Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Path to the first file.
file2 : str
Path to the second files.
file_format : str, optional
The format of the output files.
Defaults to 'csv'.
"""
# make sure that the main id columns are read as strings since
# this may affect merging in custom notebooks
string_columns = ['spkitemid', 'candidate']
converter_dict = {column: str for column in string_columns}
df1 = DataReader.read_from_file(file1, converters=converter_dict)
df2 = DataReader.read_from_file(file2, converters=converter_dict)
# convert all column names to strings
# we do this to avoid any errors during sorting.
for df in [df1, df2]:
df.columns = df.columns.map(str)
# if the first column is numeric, just force the index to string;
# however, if it is non-numeric, assume that it is an index and
# force it to string. We do this to ensure string indices are
# preserved as such
for df in [df1, df2]:
if np.issubdtype(df[df.columns[0]].dtype, np.number):
df.index = df.index.map(str)
else:
df.index = df[df.columns[0]]
for name, ext in name_ext_tuples:
if name == 'train':
df = self.df_train
elif name == 'test':
df = self.df_test
elif name == 'feature_specs':
df = self.df_specs
else:
df = self.df_other
path = TestDataReader.make_file_from_ext(df, ext)
names_.append(name)
paths_.append(path)
reader = DataReader(paths_, names_, converters)
container = reader.read()
self.filepaths.extend(paths_)
return container
def check_read_from_file(self, extension):
"""
Test whether the ``read_from_file()`` method works as expected.
"""
name = TestDataReader.make_file_from_ext(self.df_train, extension)
# now read in the file using `read_data_file()`
df_read = DataReader.read_from_file(name,
converters={'id': str, 'candidate': str})
# Make sure we get rid of the file at the end,
# at least if we get to this point (i.e. no errors raised)
self.filepaths.append(name)
assert_frame_equal(self.df_train, df_read)
if not experiment_dir_old:
raise FileNotFoundError("The directory {} "
"does not exist.".format(configuration['experiment_dir_old']))
csvdir_old = normpath(join(experiment_dir_old, 'output'))
figdir_old = normpath(join(experiment_dir_old, 'figure'))
if not exists(csvdir_old) or not exists(figdir_old):
raise FileNotFoundError("The directory {} does not contain "
"the output of an rsmtool "
"experiment.".format(experiment_dir_old))
check_experiment_id(experiment_dir_old, experiment_id_old)
# get the information about the "new" experiment
experiment_id_new = configuration['experiment_id_new']
experiment_dir_new = DataReader.locate_files(configuration['experiment_dir_new'],
configuration.configdir)
if not experiment_dir_new:
raise FileNotFoundError("The directory {} "
"does not exist.".format(configuration['experiment_dir_new']))
csvdir_new = normpath(join(experiment_dir_new, 'output'))
figdir_new = normpath(join(experiment_dir_new, 'figure'))
if not exists(csvdir_new) or not exists(figdir_new):
raise FileNotFoundError("The directory {} does not contain "
"the output of an rsmtool "
"experiment.".format(experiment_dir_new))
check_experiment_id(experiment_dir_new, experiment_id_new)
# are there specific general report sections we want to include?
general_report_sections = configuration['general_sections']
os.makedirs(figdir, exist_ok=True)
os.makedirs(reportdir, exist_ok=True)
configuration = configure('rsmeval', config_file_or_obj_or_dict)
logger.info('Saving configuration file.')
configuration.save(output_dir)
# Get output format
file_format = configuration.get('file_format', 'csv')
# Get DataWriter object
writer = DataWriter(configuration['experiment_id'])
# Make sure prediction file can be located
if not DataReader.locate_files(configuration['predictions_file'],
configuration.configdir):
raise FileNotFoundError('Error: Predictions file {} '
'not found.\n'.format(configuration['predictions_file']))
scale_with = configuration.get('scale_with')
# scale_with can be one of the following:
# (a) None : the predictions are assumed to be 'raw' and should be used as is
# when computing the metrics; the names for the final columns are
# 'raw', 'raw_trim' and 'raw_trim_round'.
# (b) 'asis' : the predictions are assumed to be pre-scaled and should be used as is
# when computing the metrics; the names for the final columns are
# 'scale', 'scale_trim' and 'scale_trim_round'.
# (c) a CSV file : the predictions are assumed to be 'raw' and should be scaled
# before computing the metrics; the names for the final columns are
# 'scale', 'scale_trim' and 'scale_trim_round'.
feature_info = join(experiment_output_dir,
'{}_feature.csv'.format(experiment_id))
post_processing = join(experiment_output_dir,
'{}_postprocessing_params.csv'.format(experiment_id))
file_paths = [input_features_file, feature_info, post_processing]
file_names = ['input_features',
'feature_info',
'postprocessing_params']
converters = {'input_features': configuration.get_default_converter()}
# Initialize the reader
reader = DataReader(file_paths, file_names, converters)
data_container = reader.read(kwargs_dict={'feature_info': {'index_col': 0}})
# load the Modeler to generate the predictions
model = Modeler.load_from_file(join(experiment_output_dir,
'{}.model'.format(experiment_id)))
# Add the model to the configuration object
configuration['model'] = model
# Initialize the processor
processor = FeaturePreprocessor()
(processed_config,
processed_container) = processor.process_data(configuration,
data_container,
context='rsmpredict')
paths.append('scale_with')
names.append('scale')
# Get the paths, names, and converters for the DataReader
(file_names,
file_paths) = configuration.get_names_and_paths(paths, names)
file_paths = DataReader.locate_files(file_paths, configuration.configdir)
converters = {'predictions': configuration.get_default_converter()}
logger.info('Reading predictions: {}.'.format(configuration['predictions_file']))
# Initialize the reader
reader = DataReader(file_paths, file_names, converters)
data_container = reader.read()
logger.info('Preprocessing predictions.')
# Initialize the processor
processor = FeaturePreprocessor()
(processed_config,
processed_container) = processor.process_data(configuration,
data_container,
context='rsmeval')
logger.info('Saving pre-processed predictions and metadata to disk.')
writer.write_experiment_output(csvdir,
processed_container,
new_names_dict={'pred_test':
Raise a warning if the file cannot be located
Defaults to False.
Returns
-------
df : pd.DataFrame or None
DataFrame containing the data in the given file,
or None if the file does not exist
Raises
------
FileNotFoundError
If `raise_error` is True and file cannot be located.
"""
if exists(filename):
return DataReader.read_from_file(filename, converters, **kwargs)
message = 'The file `{}` could not be located.'.format(filename)
if raise_error:
raise FileNotFoundError(message)
if raise_warning:
warnings.warn(message)