Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def group_consecutive_logfiles(file_hashes, index):
times = np.array([index[key]["local_time"] for key in file_hashes]).astype(
np.datetime64
)
sort_reference = np.argsort(times)
file_hashes = file_hashes[sort_reference]
times = times[sort_reference]
hours_4 = np.array(60 * 60 * 4).astype(np.timedelta64)
split_locations = np.where(np.diff(times) >= hours_4)[0] + 1
return np.split(file_hashes, split_locations)
def get_mosaiq_delivery_data_bygantry(mosaiq_delivery_data):
mu = np.array(mosaiq_delivery_data.monitor_units)
mlc = np.array(mosaiq_delivery_data.mlc)
jaw = np.array(mosaiq_delivery_data.jaw)
gantry_angles = np.array(mosaiq_delivery_data.gantry)
unique_mosaiq_gantry_angles = np.unique(gantry_angles)
mosaiq_delivery_data_bygantry = dict()
for mosaiq_gantry_angle in unique_mosaiq_gantry_angles:
gantry_angle_matches = gantry_angles == mosaiq_gantry_angle
diff_mu = np.concatenate([[0], np.diff(mu)])[gantry_angle_matches]
gantry_angle_specific_mu = np.cumsum(diff_mu)
mosaiq_delivery_data_bygantry[mosaiq_gantry_angle] = dict()
mosaiq_delivery_data_bygantry[mosaiq_gantry_angle][
"mu"
] = gantry_angle_specific_mu
mosaiq_delivery_data_bygantry[mosaiq_gantry_angle]["mlc"] = mlc[
gantry_angle_matches
]
mosaiq_delivery_data_bygantry[mosaiq_gantry_angle]["jaw"] = jaw[
gantry_angle_matches
]
return mosaiq_delivery_data_bygantry
def _gantry_angle_mask(self, gantry_angle, gantry_angle_tol):
near_angle = np.abs(np.array(self.gantry) - gantry_angle) <= gantry_angle_tol
assert np.all(np.diff(np.where(near_angle)[0]) == 1)
return near_angle
def search_for_centre_of_largest_bounded_circle(x, y, callback=None):
"""Find the centre of the largest bounded circle within the insert."""
insert = shapely_insert(x, y)
boundary = insert.boundary
centroid = insert.centroid
furthest_distance = np.hypot(
np.diff(insert.bounds[::2]), np.diff(insert.bounds[1::2])
)
def minimising_function(optimiser_input):
x, y = optimiser_input
point = shapely.geometry.Point(x, y)
if insert.contains(point):
edge_distance = point.distance(boundary)
else:
edge_distance = -point.distance(boundary)
return -edge_distance
x0 = np.squeeze(centroid.coords)
niter = 200
T = furthest_distance / 3
def get_increment(self):
""" minimum step-size increment
Returns
-------
increment : float
"""
steps = np.diff(self.x)
if np.isclose(steps.min(), steps.mean()):
return steps.mean()
else:
return steps.min()
def _from_mosaiq_base(cls, cursor, field_id):
txfield_results, txfieldpoint_results = fetch_and_verify_mosaiq_sql(
cursor, field_id
)
total_mu = np.array(txfield_results[0]).astype(float)
cumulative_percentage_mu = txfieldpoint_results[:, 0].astype(float)
if np.shape(cumulative_percentage_mu) == ():
mu_per_control_point = [0, total_mu]
else:
cumulative_mu = cumulative_percentage_mu * total_mu / 100
mu_per_control_point = np.concatenate([[0], np.diff(cumulative_mu)])
monitor_units = np.cumsum(mu_per_control_point).tolist()
mlc_a = np.squeeze(decode_msq_mlc(txfieldpoint_results[:, 1].astype(bytes))).T
mlc_b = np.squeeze(decode_msq_mlc(txfieldpoint_results[:, 2].astype(bytes))).T
msq_gantry_angle = txfieldpoint_results[:, 3].astype(float)
msq_collimator_angle = txfieldpoint_results[:, 4].astype(float)
coll_y1 = txfieldpoint_results[:, 5].astype(float)
coll_y2 = txfieldpoint_results[:, 6].astype(float)
mlc, jaw = collimation_to_bipolar_mm(mlc_a, mlc_b, coll_y1, coll_y2)
gantry = convert_IEC_angle_to_bipolar(msq_gantry_angle)
collimator = convert_IEC_angle_to_bipolar(msq_collimator_angle)