Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_get_and_set_status(campaign):
run_names = ['Run_{}'.format(i) for i in range(1, 1011)]
assert(all([campaign.get_run_status(name) == Status.NEW for name in run_names]))
campaign.set_run_statuses(run_names, Status.ENCODED)
assert(all([campaign.get_run_status(name) == Status.ENCODED for name in run_names]))
"""
print('collating data...')
decoder = campaign._active_app_decoder
if decoder.output_type != OutputType.SAMPLE:
raise RuntimeError('Can only aggregate sample type data')
# Aggregate any uncollated runs into a dataframe (for appending to existing full df)
new_data = {}
qoi_cols = campaign._active_app_decoder.output_columns
# Loop through all runs with status ENCODED (and therefore not yet COLLATED)
processed_run_IDs = []
for run_id, run_info in campaign.campaign_db.runs(
status=constants.Status.ENCODED, app_id=app_id):
# Use decoder to check if run has completed (in general application-specific)
if decoder.sim_complete(run_info=run_info):
run_data = decoder.parse_sim_output(run_info=run_info)
new_data[run_id] = {}
for qoi in qoi_cols:
new_data[run_id][qoi] = run_data[qoi].values
processed_run_IDs.append(run_id)
self.append_data(campaign, new_data)
campaign.campaign_db.set_run_statuses(processed_run_IDs, constants.Status.COLLATED)
print('done.')
return len(processed_run_IDs)
def scan_completed(self, *args, **kwargs):
"""
Check campaign database for completed runs (defined as runs with COLLATED status)
Returns
-------
list of runs
"""
return self.list_runs(status=Status.COLLATED)
"""
filter_options = {'run_name': run_name}
if campaign:
filter_options['campaign'] = campaign
if sampler:
filter_options['sampler'] = sampler
selected = self.session.query(RunTable).filter_by(**filter_options)
if selected.count() != 1:
logging.critical('Multiple runs selected - using the first')
selected = selected.first()
return constants.Status(selected.status)
# TODO: Add version check
self._campaign_info = input_info['campaign']
self._app = input_info.get('app', {})
self._runs = input_info.get('runs', {})
self._sample = input_info.get('sample', {})
self._collation_csv = input_info.get('collation_csv', {})
self._next_run = input_info['next_run']
self._next_ensemble = input_info['next_ensemble']
self._app['params'] = ParamsSpecification.deserialize(self._app['params'])
# Convert run statuses to enums
for run_id in self._runs:
self._runs[run_id]['status'] = constants.Status(self._runs[run_id]['status'])
'run_dir': self.run_dir,
'params': json.dumps(self.params, default=convert_nonserializable),
'status': constants.Status(self.status),
'campaign': self.campaign,
'sample': self.sample,
'app': self.app,
}
else:
out_dict = {
'run_name': self.run_name,
'ensemble_name': self.ensemble_name,
'run_dir': self.run_dir,
'params': self.params,
'status': constants.Status(self.status),
'campaign': self.campaign,
'sample': self.sample,
'app': self.app,
}
return out_dict
# make a row for every sim_output value
for i, output_val in enumerate(sim_output.columns.tolist()):
for param, value in params.items():
run_data.loc[i, param] = value
run_data.loc[i, 'Variable'] = output_val
run_data.loc[i, 'Value'] = sim_output.loc[0, output_val]
run_data.loc[i, 'run_id'] = run_id
run_data.loc[i, 'ensemble_id'] = run_info['ensemble_name']
new_data = new_data.append(run_data, ignore_index=True)
processed_run_IDs.append(run_id)
self.append_data(campaign, new_data, app_id)
campaign.campaign_db.set_run_statuses(processed_run_IDs, constants.Status.COLLATED)
return len(processed_run_IDs)
def apply_for_each_run_dir(self, action, status=Status.ENCODED):
"""
For each run in this Campaign's run list, apply the specified action
(an object of type Action)
Parameters
----------
action : the action to be applied to each run directory
The function to be applied to each run directory. func() will
be called with the run directory path as its only argument.
Returns
-------
"""
# Loop through all runs in this campaign with status ENCODED, and
# run the specified action on each run's dir
`int`:
The number of new data rows added during collation
"""
decoder = campaign._active_app_decoder
if decoder.output_type != OutputType.SAMPLE:
raise RuntimeError('Can only aggregate sample type data')
# Aggregate any uncollated runs into a dataframe (for appending to existing full df)
new_data = pd.DataFrame()
# Loop through all runs with status ENCODED (and therefore not yet COLLATED)
processed_run_IDs = []
for run_id, run_info in campaign.campaign_db.runs(
status=constants.Status.ENCODED, app_id=app_id):
# Use decoder to check if run has completed (in general application-specific)
if decoder.sim_complete(run_info=run_info):
run_data = decoder.parse_sim_output(run_info=run_info)
if self.average:
run_data = pd.DataFrame(run_data.mean()).transpose()
mult = isinstance(run_data.columns, pd.MultiIndex)
params = run_info['params']
for param, value in params.items():
if isinstance(value, list):
# need to have multi-index dataframe
if not mult:
run_row: RunTable
Information on a particular run in the database.
Returns
-------
dict:
Contains run information (keys = run_name, params, status, sample,
campaign and app)
"""
run_info = {
'run_name': run_row.run_name,
'ensemble_name': run_row.ensemble_name,
'params': json.loads(run_row.params),
'status': constants.Status(run_row.status),
'sample': run_row.sample,
'campaign': run_row.campaign,
'app': run_row.app,
'run_dir': run_row.run_dir
}
return run_info