Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def read(self, dirname: str, executor=None):
"""Iterates over strax results from directory dirname"""
with open(dirname + '/metadata.json', mode='r') as f:
metadata = json.loads(f.read())
if not len(metadata['chunks']):
self.log.warning(f"No data files in {dirname}?")
dtype = literal_eval(metadata['dtype'])
compressor = metadata['compressor']
kwargs = dict(dtype=dtype, compressor=compressor)
for chunk_info in metadata['chunks']:
fn = os.path.join(dirname, chunk_info['filename'])
if executor is None:
yield strax.load_file(fn, **kwargs)
else:
yield executor.submit(strax.load_file, fn, **kwargs)
def _read_chunk(self, dirname, chunk_info, dtype, compressor):
fn = osp.join(dirname, chunk_info['filename'])
return strax.load_file(fn, dtype=dtype, compressor=compressor)
def _read_chunk(self, backend_key, chunk_info, dtype, compressor):
# Temporary hack for backward compatibility
if 'filename' in chunk_info:
chunk_info['key_name'] = f"{backend_key}/{chunk_info['filename']}"
with tempfile.SpooledTemporaryFile() as f:
self.s3.download_fileobj(Bucket=BUCKET_NAME,
Key=chunk_info['key_name'],
Fileobj=f)
f.seek(0) # Needed?
return strax.load_file(f,
dtype=dtype,
compressor=compressor)
def _read_chunk(self, zipn_and_dirn, chunk_info, dtype, compressor):
zipn, dirn = zipn_and_dirn
with zipfile.ZipFile(zipn) as zp:
with zp.open(dirn + '/' + chunk_info['filename']) as f:
return strax.load_file(f, dtype=dtype, compressor=compressor)
def _load_chunk(self, path, kind='central'):
records = [strax.load_file(fn,
compressor='blosc',
dtype=strax.record_dtype())
for fn in glob.glob(f'{path}/reader_*')]
records = np.concatenate(records)
records = strax.sort_by_time(records)
if kind == 'central':
result = records
else:
result = strax.from_break(
records,
safe_break=self.config['safe_break_in_pulses'],
left=kind == 'post',
tolerant=True)
if self.config['erase']:
shutil.rmtree(path)
return result