Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
)
if num_control_points > 0:
mu_density = [
pymedphys.mudensity.grid(),
delivery_data[file_hash][mosaiq_gantry_angle].mudensity(),
]
if (
mosaiq_gantry_angle
not in logfile_mu_density_bygantry[logfile_group]
):
logfile_mu_density_bygantry[logfile_group][
mosaiq_gantry_angle
] = list(mu_density)
else:
assert np.all(
logfile_mu_density_bygantry[logfile_group][
mosaiq_gantry_angle
][0]
== mu_density[0]
)
assert np.all(
logfile_mu_density_bygantry[logfile_group][
mosaiq_gantry_angle
][1]
== mu_density[1]
)
logfile_mu_density_bygantry[logfile_group][mosaiq_gantry_angle][
2
] += mu_density[2]
return logfile_mu_density_bygantry
mosaiq_delivery_data.monitor_units,
mosaiq_delivery_data.mlc,
mosaiq_delivery_data.jaw,
)
delivery_data = cls._from_mosaiq_base(cursor, field_id)
test_data = (delivery_data.monitor_units, delivery_data.mlc, delivery_data.jaw)
agreement = False
while not agreement:
agreements = []
for ref, test in zip(reference_data, test_data):
agreements.append(np.all(ref == test))
agreement = np.all(agreements)
if not agreement:
print("Converted Mosaiq delivery data was conflicting.")
print(
"MU agreement: {}\nMLC agreement: {}\n"
"Jaw agreement: {}".format(*agreements)
)
print("Trying again...")
reference_data = test_data
delivery_data = cls._from_mosaiq_base(cursor, field_id)
test_data = (
delivery_data.monitor_units,
delivery_data.mlc,
delivery_data.jaw,
)
return delivery_data
mu_density_results = [delivery_data.mudensity() for delivery_data in deliveries]
mu_densities = [results[2] for results in mu_density_results]
labels = [
"Server: `{}` | Field ID: `{}`".format(server, field_id)
for server, field_id in zip(servers, field_ids)
]
plot_gantry_collimator(labels, deliveries)
plot_mu_densities(labels, mu_density_results)
mu_densities_match = np.all(
[
np.all(np.abs(mu_density_a - mu_density_b) < 0.1)
for mu_density_a, mu_density_b in itertools.combinations(mu_densities, 2)
]
)
plt.show()
print("MU Densities match: {}".format(mu_densities_match))
return deliveries, mu_densities
def pcolormesh_grid(x, y, grid_resolution=None):
if grid_resolution is None:
diffs = np.hstack([np.diff(x), np.diff(y)])
assert np.all(np.abs(diffs - diffs[0]) < 10 ** -12)
grid_resolution = diffs[0]
new_x = np.concatenate([x - grid_resolution / 2, [x[-1] + grid_resolution / 2]])
new_y = np.concatenate([y - grid_resolution / 2, [y[-1] + grid_resolution / 2]])
return new_x, new_y
def dicom_dataset_from_dict(input_dict: dict, template_ds=None):
"""Create a pydicom DICOM object from a dictionary"""
if template_ds is None:
dataset = pydicom.Dataset()
else:
dataset = deepcopy(template_ds)
for key, value in input_dict.items():
if key not in get_dicom_names():
raise ValueError("{} is not within the DICOM dictionary.".format(key))
if isinstance(value, dict):
setattr(dataset, key, dicom_dataset_from_dict(value))
elif isinstance(value, list):
# TODO: Check for DICOM SQ type on this attribute
if np.all([not isinstance(item, dict) for item in value]):
add_array_to_dataset(dataset, key, value)
elif np.all([isinstance(item, dict) for item in value]):
setattr(dataset, key, [dicom_dataset_from_dict(item) for item in value])
else:
raise ValueError(
"{} should contain either only dictionaries, or no "
"dictionaries".format(key)
)
else:
add_array_to_dataset(dataset, key, value)
return dataset
)
xx, yy = np.meshgrid(x, y)
zz = spline_model_with_deformability(
xx,
convert2_ratio_perim_area(xx, yy),
width_data,
convert2_ratio_perim_area(width_data, length_data),
factor_data,
)
zz[xx > yy] = np.nan
no_data_x = np.all(np.isnan(zz), axis=0)
no_data_y = np.all(np.isnan(zz), axis=1)
x = x[np.invert(no_data_x)]
y = y[np.invert(no_data_y)]
zz = zz[np.invert(no_data_y), :]
zz = zz[:, np.invert(no_data_x)]
return x, y, zz
def assert_array_agreement(
unique_logfile_gantry_angles, mosaiq_gantry_angles, allowed_deviation
):
difference_matrix = np.abs(
unique_logfile_gantry_angles[:, None] - mosaiq_gantry_angles[None, :]
)
agreement_matrix = difference_matrix <= allowed_deviation
row_agreement = np.any(agreement_matrix, axis=1)
at_least_one_agreement = np.all(row_agreement)
assert at_least_one_agreement, (
"There is a logfile gantry angle that deviates by more than {} degrees"
" from the Mosaiq control points. Unsure how to handle this.\n\n"
"Logfile: {}\nMosaiq: {}\nDifference Matrix:\n{}\n"
"Agreement Matrix:\n{}".format(
allowed_deviation,
unique_logfile_gantry_angles,
mosaiq_gantry_angles,
difference_matrix,
agreement_matrix,
)
def _calc_blocked_t(travel_diff, grid_resolution):
blocked_t = np.ones_like(travel_diff) * np.nan
fully_blocked = travel_diff <= -grid_resolution / 2
fully_open = travel_diff >= grid_resolution / 2
blocked_t[fully_blocked] = 1
blocked_t[fully_open] = 0
transient = ~fully_blocked & ~fully_open
blocked_t[transient] = (
-travel_diff[transient] + grid_resolution / 2
) / grid_resolution
assert np.all(~np.isnan(blocked_t))
return blocked_t
def _gantry_angle_masks(
self, gantry_angles, gantry_tol, allow_missing_angles=False
):
masks = [
self._gantry_angle_mask(gantry_angle, gantry_tol)
for gantry_angle in gantry_angles
]
for mask in masks:
if np.all(mask == 0):
continue
# TODO: Apply mask by more than just gantry angle to appropriately
# extract beam index even when multiple beams have the same gantry
# angle
is_duplicate_gantry_angles = (
np.sum(np.abs(np.diff(np.concatenate([[0], mask, [0]])))) != 2
)
if is_duplicate_gantry_angles:
raise ValueError("Duplicate gantry angles not yet supported")
try:
assert np.all(np.sum(masks, axis=0) == 1), (
"Not all beams were captured by the gantry tolerance of "
" {}".format(gantry_tol)
if (
mosaiq_gantry_angle
not in logfile_mu_density_bygantry[logfile_group]
):
logfile_mu_density_bygantry[logfile_group][
mosaiq_gantry_angle
] = list(mu_density)
else:
assert np.all(
logfile_mu_density_bygantry[logfile_group][
mosaiq_gantry_angle
][0]
== mu_density[0]
)
assert np.all(
logfile_mu_density_bygantry[logfile_group][
mosaiq_gantry_angle
][1]
== mu_density[1]
)
logfile_mu_density_bygantry[logfile_group][mosaiq_gantry_angle][
2
] += mu_density[2]
return logfile_mu_density_bygantry