Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_find_peaks(hits, min_hits, min_area):
gap_threshold = 10
peaks = strax.find_peaks(hits,
to_pe=np.ones(1),
right_extension=0, left_extension=0,
gap_threshold=gap_threshold,
min_hits=min_hits,
min_area=min_area)
# Check sanity
assert np.all(peaks['length'] > 0)
# Check if requirements satisfied
if min_area != 0:
assert np.all(peaks['area'] >= min_area)
if min_hits != 1:
assert np.all(peaks['n_hits'] >= min_hits)
# Without requirements, all hits must occur in a peak
if min_area == 0 and min_hits == 1:
:param gap_threshold: Minimum gap between peaks
:param left_extension: Extend groups by this many ns left
:param right_extension: " " right
:param max_duration: Maximum group duration. See strax.find_peaks for
what happens if this is exceeded
:return: time, endtime arrays of group boundaries
"""
# Mock up a "hits" array so we can just use the existing peakfinder
# It doesn't work on raw peaks, since they might have different dts
# TODO: is there no cleaner way?
fake_hits = np.zeros(len(peaks), dtype=strax.hit_dtype)
fake_hits['dt'] = 1
fake_hits['time'] = peaks['time']
# TODO: could this cause int overrun nonsense anywhere?
fake_hits['length'] = peaks['endtime'] - peaks['time']
fake_peaks = strax.find_peaks(
fake_hits, to_pe=np.zeros(1),
gap_threshold=gap_threshold,
left_extension=left_extension, right_extension=right_extension,
min_hits=1, min_area=0,
max_duration=max_duration)
return fake_peaks['time'], strax.endtime(fake_peaks)
:param peaks: Peaks to group
:param gap_threshold: Minimum gap between peaks
:param left_extension: Extend groups by this many ns left
:param right_extension: " " right
:return: time, endtime arrays of group boundaries
"""
# Mock up a "hits" array so we can just use the existing peakfinder
# It doesn't work on raw peaks, since they might have different dts
# TODO: is there no cleaner way?
fake_hits = np.zeros(len(peaks), dtype=strax.hit_dtype)
fake_hits['dt'] = 1
fake_hits['area'] = 1
fake_hits['time'] = peaks['time']
# TODO: could this cause int overrun nonsense anywhere?
fake_hits['length'] = strax.endtime(peaks) - peaks['time']
fake_peaks = strax.find_peaks(
fake_hits, adc_to_pe=np.ones(1),
gap_threshold=gap_threshold,
left_extension=left_extension, right_extension=right_extension,
min_channels=1, min_area=0)
return fake_peaks['time'], strax.endtime(fake_peaks)
def compute(self, records):
r = records
hits = strax.find_hits(r) # TODO: Duplicate work
hits = strax.sort_by_time(hits)
peaks = strax.find_peaks(hits, to_pe,
result_dtype=self.dtype)
strax.sum_waveform(peaks, r, to_pe)
peaks = strax.split_peaks(peaks, r, to_pe)
strax.compute_widths(peaks)
if self.config['diagnose_sorting']:
assert np.diff(r['time']).min() >= 0, "Records not sorted"
assert np.diff(hits['time']).min() >= 0, "Hits not sorted"
assert np.all(peaks['time'][1:]
>= strax.endtime(peaks)[:-1]), "Peaks not disjoint"
return peaks
def compute(self, raw_records):
processed_records = strax.exclude_tails(raw_records, to_pe)
hits = strax.find_hits(processed_records)
strax.cut_outside_hits(processed_records, hits)
peak_details = strax.find_peaks(hits, to_pe,
gap_threshold=300,
min_hits=3)
strax.sum_waveform(peak_details, processed_records, to_pe)
return dict(peak_details=peak_details)