Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import os
import strax
import numba
import numpy as np
export, __all__ = strax.exporter()
# (5-10x) faster than np.sort(order=...), as np.sort looks at all fields
# TODO: maybe this should be a factory?
@export
@numba.jit(nopython=True, nogil=True, cache=True)
def sort_by_time(x):
"""Sort pulses by time, then channel.
Assumes you have no more than 10k channels, and records don't span
more than 100 days. TODO: FIX this
"""
if len(x) == 0:
# Nothing to do, and .min() on empty array doesn't work, so:
return x
# I couldn't get fast argsort on multiple keys to work in numba
import numpy as np
import numba
import strax
from strax import utils
from strax.dtypes import peak_dtype, DIGITAL_SUM_WAVEFORM_CHANNEL
export, __all__ = strax.exporter()
@export
@utils.growing_result(dtype=peak_dtype(), chunk_size=int(1e4))
@numba.jit(nopython=True, nogil=True, cache=True)
def find_peaks(hits, adc_to_pe,
gap_threshold=300,
left_extension=20, right_extension=150,
min_area=0,
min_channels=2,
_result_buffer=None, result_dtype=None):
"""Return peaks made from grouping hits together
Assumes all hits have the same dt
:param hits: Hit (or any interval) to group
:param left_extension: Extend peaks by this many ns left
:param right_extension: Extend peaks by this many ns right
from concurrent import futures
from functools import partial
import logging
import typing as ty
import os
import sys
from concurrent.futures import ProcessPoolExecutor
import numpy as np
import strax
export, __all__ = strax.exporter()
try:
import npshmex
SHMExecutor = npshmex.ProcessPoolExecutor
npshmex.register_array_wrapper(strax.Chunk, 'data')
except ImportError:
# This is allowed to fail, it only crashes if allow_shm = True
SHMExecutor = None
@export
class ProcessorComponents(ty.NamedTuple):
"""Specification to assemble a processor"""
plugins: ty.Dict[str, strax.Plugin]
loaders: ty.Dict[str, callable]
savers: ty.Dict[str, ty.List[strax.Saver]]
import glob
import os
import shutil
import numpy as np
import numba
import strax
from .common import to_pe, pax_file, get_resource, get_elife
export, __all__ = strax.exporter()
first_sr1_run = 170118_1327
@export
@strax.takes_config(
strax.Option('safe_break_in_pulses', default=1000,
help="Time (ns) between pulse starts indicating a safe break "
"in the datastream -- peaks will not span this."),
strax.Option('input_dir', type=str, track=False,
help="Directory where readers put data"),
strax.Option('erase', default=False, track=False,
help="Delete reader data after processing"))
class DAQReader(strax.ParallelSourcePlugin):
provides = 'raw_records'
import os
import re
import socket
import pymongo
import strax
export, __all__ = strax.exporter()
@export
class RunDB(strax.StorageFrontend):
"""Frontend that searches RunDB MongoDB for data.
Loads appropriate backends ranging from Files to S3.
"""
def __init__(self,
mongo_url,
path='.',
s3_kwargs={},
*args, **kwargs):
super().__init__(*args, **kwargs)
import collections
import datetime
import logging
import fnmatch
from functools import partial
import random
import string
import typing as ty
import warnings
import numexpr
import numpy as np
import pandas as pd
import strax
export, __all__ = strax.exporter()
__all__ += ['RUN_DEFAULTS_KEY']
RUN_DEFAULTS_KEY = 'strax_defaults'
@strax.takes_config(
strax.Option(name='storage_converter', default=False,
help='If True, save data that is loaded from one frontend '
'through all willing other storage frontends.'),
strax.Option(name='fuzzy_for', default=tuple(),
help='Tuple of plugin names for which no checks for version, '
'providing plugin, and config will be performed when '
'looking for data.'),
strax.Option(name='fuzzy_for_options', default=tuple(),
help='Tuple of config options for which no checks will be '
'performed when looking for data.'),
* S3_SECRET_ACCESS_KEY
"""
import json
import os
import tempfile
import boto3
from botocore.client import Config
from botocore.exceptions import ClientError
import strax
from strax import StorageFrontend
export, __all__ = strax.exporter()
# Track versions of S3 interface
VERSION = 2
BUCKET_NAME = 'snax_s3_v%d' % VERSION
@export
class SimpleS3Store(StorageFrontend):
"""Frontend for S3 stores that just checks if data exists.
This uses boto3 for communicating, where you can look at their docs
to understand a lot of this. S3 is an object store where each object is
chunk. The bucket corresponds to strax key (run / plugin).
Currently, no run level metadata is stored.
"""
"""
from ast import literal_eval
import json
import glob
import logging
import os
import shutil
import sys
import traceback
import time
import typing
import numpy as np
import strax
export, __all__ = strax.exporter()
@export
class CacheKey(typing.NamedTuple):
run_id: str
data_type: str
lineage: dict
@export
class NotCachedException(Exception):
pass
@export
class FileStorage:
import builtins
import typing as ty
from immutabledict import immutabledict
import strax
export, __all__ = strax.exporter()
# Placeholder value for omitted values.
# Use instead of None since None might be a proper value/default
OMITTED = ''
__all__.append('OMITTED')
@export
class InvalidConfiguration(Exception):
pass
@export
def takes_config(*options):
"""Decorator for plugin classes, to specify which options it takes.
:param options: Option instances of options this plugin takes.
import numpy as np
import numba
import strax
export, __all__ = strax.exporter()
@export
def split_peaks(peaks, records, to_pe, algorithm='local_minimum', **kwargs):
"""Return peaks split according to algorithm, with waveforms summed
and widths computed.
:param peaks: Original peaks. Sum waveform must have been built
and properties must have been computed (if you use them)
:param records: Records from which peaks were built
:param to_pe: ADC to PE conversion factor array (of n_channels)
:param algorithm: 'local_minimum' or 'natural_breaks'.
Any other options are passed to the algorithm.
"""
splitter = dict(local_minimum=LocalMinimumSplitter,