Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
omnibus_tables = {table_name: [] for table_name in omnibus_keys}
for process_name in sub_proc_names:
pipeline_path = config.build_output_file_path(pipeline_file_name, use_prefix=process_name)
logger.info("coalesce pipeline %s", pipeline_path)
with pd.HDFStore(pipeline_path, mode='r') as pipeline_store:
for table_name, hdf5_key in iteritems(omnibus_keys):
omnibus_tables[table_name].append(pipeline_store[hdf5_key])
pipeline.open_pipeline()
# - add mirrored tables to pipeline
for table_name in mirrored_tables:
df = mirrored_tables[table_name]
logger.info("adding mirrored table %s %s", table_name, df.shape)
pipeline.replace_table(table_name, df)
# - concatenate omnibus tables and add them to pipeline
for table_name in omnibus_tables:
df = pd.concat(omnibus_tables[table_name], sort=False)
logger.info("adding omnibus table %s %s", table_name, df.shape)
pipeline.replace_table(table_name, df)
pipeline.add_checkpoint(checkpoint_name)
pipeline.close_pipeline()
incidence_table = build_incidence_table(control_spec, households_df, persons_df, crosswalk_df)
incidence_table = add_geography_columns(incidence_table, households_df, crosswalk_df)
# add sample_weight col to incidence table
hh_weight_col = setting('household_weight_col')
incidence_table['sample_weight'] = households_df[hh_weight_col]
# rebuild control tables with only the low level controls (aggregated at higher levels)
for g in geographies:
controls = build_control_table(g, control_spec, crosswalk_df)
pipeline.replace_table(control_table_name(g), controls)
if setting('GROUP_BY_INCIDENCE_SIGNATURE') and not setting('NO_INTEGERIZATION_EVER', False):
group_incidence_table, household_groups \
= build_grouped_incidence_table(incidence_table, control_spec, seed_geography)
pipeline.replace_table('household_groups', household_groups)
pipeline.replace_table('incidence_table', group_incidence_table)
else:
pipeline.replace_table('incidence_table', incidence_table)
trips_df = trips.to_frame()
choices = run_trip_purpose(
trips_df,
chunk_size=chunk_size,
trace_hh_id=trace_hh_id,
trace_label=trace_label
)
trips_df['purpose'] = choices
# we should have assigned a purpose to all trips
assert not trips_df.purpose.isnull().any()
pipeline.replace_table("trips", trips_df)
if trace_hh_id:
tracing.trace_df(trips_df,
label=trace_label,
slicer='trip_id',
index_label='trip_id',
warn_if_empty=True)
force_garbage_collect()
choices = pd.concat(choices_list)
trips_df = trips.to_frame()
trips_df['trip_mode'] = choices
tracing.print_summary('tour_modes',
trips_merged.tour_mode, value_counts=True)
tracing.print_summary('trip_mode_choice choices',
choices, value_counts=True)
assert not trips_df.trip_mode.isnull().any()
pipeline.replace_table("trips", trips_df)
if trace_hh_id:
tracing.trace_df(trips_df,
label=tracing.extend_trace_label(trace_label, 'trip_mode'),
slicer='trip_id',
index_label='trip_id',
warn_if_empty=True)
choices = simulate.simple_simulate(
choosers=households_merged.to_frame(),
spec=model_spec,
nest_spec=nest_spec,
locals_d=constants,
chunk_size=chunk_size,
trace_label=trace_label,
trace_choice_name='auto_ownership')
households = households.to_frame()
# no need to reindex as we used all households
households['auto_ownership'] = choices
pipeline.replace_table("households", households)
tracing.print_summary('auto_ownership', households.auto_ownership, value_counts=True)
if trace_hh_id:
tracing.trace_df(households,
label='auto_ownership',
warn_if_empty=True)
configs_dir : str
households: pipeline table
persons: pipeline table
Returns
-------
"""
seed_geography = setting('seed_geography')
geographies = setting('geographies')
low_geography = geographies[-1]
# replace crosswalk table
crosswalk_df = build_crosswalk_table()
pipeline.replace_table('crosswalk', crosswalk_df)
# replace control_spec
control_file_name = setting('repop_control_file_name', 'repop_controls.csv')
control_spec = read_control_spec(control_file_name, configs_dir)
# repop control spec should only specify controls for lowest level geography
assert control_spec.geography.unique() == [low_geography]
pipeline.replace_table('control_spec', control_spec)
# build incidence_table with repop controls and households in repop zones
# filter households (dropping any not in crosswalk) in order to build incidence_table
# We DO NOT REPLACE households and persons as we need full tables to synthesize population
# (There is no problem, however, with overwriting the incidence_table and household_groups
# because the expand_households step has ALREADY created the expanded_household_ids table
# for the original simulated population. )
'skim_od': AccessibilitySkims(skim_dict, orig_zones, dest_zones),
'skim_do': AccessibilitySkims(skim_dict, orig_zones, dest_zones, transpose=True)
}
if constants is not None:
locals_d.update(constants)
results, trace_results, trace_assigned_locals \
= assign.assign_variables(assignment_spec, od_df, locals_d, trace_rows=trace_od_rows)
for column in results.columns:
data = np.asanyarray(results[column])
data.shape = (orig_zone_count, dest_zone_count)
accessibility_df[column] = np.log(np.sum(data, axis=1) + 1)
# - write table to pipeline
pipeline.replace_table("accessibility", accessibility_df)
if trace_od:
if not trace_od_rows.any():
logger.warning("trace_od not found origin = %s, dest = %s" % (trace_orig, trace_dest))
else:
# add OD columns to trace results
df = pd.concat([od_df[trace_od_rows], trace_results], axis=1)
# dump the trace results table (with _temp variables) to aid debugging
tracing.trace_df(df,
label='accessibility',
index_label='skim_offset',
slicer='NONE',
warn_if_empty=True)
persons_df = persons.to_frame()
# We only chose school locations for the subset of persons who go to school
# so we backfill the empty choices with -1 to code as no school location
NO_DEST_TAZ = -1
persons_df[dest_choice_column_name] = \
choices.reindex(persons_df.index).fillna(NO_DEST_TAZ).astype(int)
# - annotate persons table
if 'annotate_persons' in model_settings:
expressions.assign_columns(
df=persons_df,
model_settings=model_settings.get('annotate_persons'),
trace_label=tracing.extend_trace_label(trace_label, 'annotate_persons'))
pipeline.replace_table("persons", persons_df)
if trace_hh_id:
tracing.trace_df(persons_df,
label=trace_label,
warn_if_empty=True)
# - annotate households table
if 'annotate_households' in model_settings:
households_df = households.to_frame()
expressions.assign_columns(
df=households_df,
model_settings=model_settings.get('annotate_households'),
trace_label=tracing.extend_trace_label(trace_label, 'annotate_households'))
pipeline.replace_table("households", households_df)