Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def convert_numbers_to_string(name, lookup, column):
dtype = np.array([item for _, item in lookup.items()]).dtype
result = np.empty_like(column).astype(dtype)
result[:] = ""
for i, item in lookup.items():
result[column.values == int(i)] = item
if np.any(result == ""):
print(lookup)
print(np.where(result == ""))
print(column[result == ""].values)
unconverted_entries = np.unique(column[result == ""])
raise Exception(
"The conversion lookup list for converting {} is incomplete. "
"The following data numbers were not converted:\n"
"{}\n"
"Please update the trf2csv conversion script to include these "
"in its definitions.".format(name, unconverted_entries)
)
return result
def get_mosaiq_delivery_data_bygantry(mosaiq_delivery_data):
mu = np.array(mosaiq_delivery_data.monitor_units)
mlc = np.array(mosaiq_delivery_data.mlc)
jaw = np.array(mosaiq_delivery_data.jaw)
gantry_angles = np.array(mosaiq_delivery_data.gantry)
unique_mosaiq_gantry_angles = np.unique(gantry_angles)
mosaiq_delivery_data_bygantry = dict()
for mosaiq_gantry_angle in unique_mosaiq_gantry_angles:
gantry_angle_matches = gantry_angles == mosaiq_gantry_angle
diff_mu = np.concatenate([[0], np.diff(mu)])[gantry_angle_matches]
gantry_angle_specific_mu = np.cumsum(diff_mu)
mosaiq_delivery_data_bygantry[mosaiq_gantry_angle] = dict()
mosaiq_delivery_data_bygantry[mosaiq_gantry_angle][
"mu"
] = gantry_angle_specific_mu
mosaiq_delivery_data_bygantry[mosaiq_gantry_angle]["mlc"] = mlc[
gantry_angle_matches
]
def calc_normalisation(mosaiq_delivery_data):
all_gantry_angles = mosaiq_delivery_data.mudensity
mosaiq_gantry_angles = np.unique(mosaiq_delivery_data.gantry)
number_of_gantry_angles = len(mosaiq_gantry_angles)
normalisation = np.sum(all_gantry_angles) / number_of_gantry_angles
return normalisation
def create_bb_to_minimise(field, bb_diameter):
"""This is a numpy vectorised version of `create_bb_to_minimise_simple`
"""
points_to_check_edge_agreement, dist = create_bb_points_function(bb_diameter)
dist_mask = np.unique(dist)[:, None] == dist[None, :]
num_in_mask = np.sum(dist_mask, axis=1)
mask_count_per_item = np.sum(num_in_mask[:, None] * dist_mask, axis=0)
mask_mean_lookup = np.where(dist_mask)[0]
def to_minimise_edge_agreement(centre):
x, y = points_to_check_edge_agreement(centre)
results = field(x, y)
masked_results = results * dist_mask
mask_mean = np.sum(masked_results, axis=1) / num_in_mask
diff_to_mean_square = (results - mask_mean[mask_mean_lookup]) ** 2
mean_of_layers = np.sum(diff_to_mean_square[1::] / mask_count_per_item[1::]) / (
len(mask_mean) - 1
)
return mean_of_layers
- (np.round((-min_y + grid_reference_position) / grid_resolution))
* grid_resolution
)
grid = dict()
grid["jaw"] = np.arange(
bot_grid_pos, top_grid_pos + grid_resolution, grid_resolution
).astype("float")
grid_leaf_map = np.argmin(
np.abs(grid["jaw"][:, None] - leaf_centres[None, :]), axis=1
)
adjusted_grid_leaf_map = grid_leaf_map - np.min(grid_leaf_map)
leaves_to_be_calced = np.unique(grid_leaf_map)
adjusted_mlc = mlc[:, leaves_to_be_calced, :]
min_x = np.round(np.min(-adjusted_mlc[:, :, 0]) / grid_resolution) * grid_resolution
max_x = np.round(np.max(adjusted_mlc[:, :, 1]) / grid_resolution) * grid_resolution
grid["mlc"] = np.arange(min_x, max_x + grid_resolution, grid_resolution).astype(
"float"
)
return grid, adjusted_grid_leaf_map, adjusted_mlc
np.sum(np.abs(np.diff(np.concatenate([[0], mask, [0]])))) != 2
)
if is_duplicate_gantry_angles:
raise ValueError("Duplicate gantry angles not yet supported")
try:
assert np.all(np.sum(masks, axis=0) == 1), (
"Not all beams were captured by the gantry tolerance of "
" {}".format(gantry_tol)
)
except AssertionError:
if not allow_missing_angles:
print("Allowable gantry angles = {}".format(gantry_angles))
gantry = np.array(self.gantry, copy=False)
out_of_tolerance = np.unique(
gantry[np.sum(masks, axis=0) == 0]
).tolist()
print(
"The gantry angles out of tolerance were {}".format(
out_of_tolerance
)
)
raise
return masks
def create_bb_to_minimise_simple(field, bb_diameter):
points_to_check_edge_agreement, dist = create_bb_points_function(bb_diameter)
dist_mask = np.unique(dist)[:, None] == dist[None, :]
def to_minimise_edge_agreement(centre):
x, y = points_to_check_edge_agreement(centre)
total_minimisation = 0
for current_mask in dist_mask[1::]:
current_layer = field(x[current_mask], y[current_mask])
total_minimisation += np.mean((current_layer - np.mean(current_layer)) ** 2)
return total_minimisation / (len(dist_mask) - 1)
points_to_check_min_pvals, _ = create_bb_points_function(bb_diameter * 0.5)
def to_minimise_pixel_vals(centre):
x, y = points_to_check_min_pvals(centre)
def get_mu_densities_for_file_hashes(index, config, cursor, file_hashes):
field_ids = {
index[file_hash]["delivery_details"]["field_id"] for file_hash in file_hashes
}
assert len(field_ids) == 1
field_id = field_ids.pop()
logfile_groups = group_consecutive_logfiles(file_hashes, index)
logfile_groups = [tuple(group) for group in logfile_groups]
mosaiq_delivery_data = pymedphys.Delivery.from_mosaiq(cursor, field_id)
mosaiq_gantry_angles = np.unique(mosaiq_delivery_data.gantry)
logfile_delivery_data_bygantry = get_logfile_delivery_data_bygantry(
index, config, logfile_groups, mosaiq_gantry_angles
)
logfile_mu_density_bygantry = get_logfile_mu_density_bygantry(
logfile_groups, mosaiq_gantry_angles, logfile_delivery_data_bygantry
)
mosaiq_delivery_data_bygantry = get_mosaiq_delivery_data_bygantry(
mosaiq_delivery_data
)
mosaiq_mu_density_bygantry = get_mosaiq_mu_density_bygantry(
mosaiq_delivery_data_bygantry
)
normalisation = calc_normalisation(mosaiq_delivery_data)