Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _get_schema(self):
import dask.array as da
if self._arr is None:
self._arr = da.from_zarr(self.url, component=self.component,
storage_options=self.storage_options,
**self.kwargs)
self.chunks = self._arr.chunks
self.npartitions = self._arr.npartitions
return Schema(dtype=str(self.dtype), shape=self.shape,
extra_metadata=self.metadata,
npartitions=self.npartitions,
chunks=self.chunks)
self.shape = arr.shape
self.dtype = arr.dtype
arrs = [arr] + [NumpyAccess(f, self.shape, self.dtype)
for f in files[1:]]
else:
arrs = [NumpyAccess(f, self.shape, self.dtype)
for f in files]
self.chunks = (self._chunks, ) + (-1, ) * (len(self.shape) - 1)
self._arrs = [da.from_array(arr, self.chunks) for arr in arrs]
if len(self._arrs) > 1:
self._arr = da.stack(self._arrs)
else:
self._arr = self._arrs[0]
self.chunks = self._arr.chunks
return Schema(dtype=str(self.dtype), shape=self.shape,
extra_metadata=self.metadata,
npartitions=self._arr.npartitions,
chunks=self.chunks)
def _parse_open_response(self, response):
self.datashape = response['datashape']
dtype_descr = response['dtype']
if isinstance(dtype_descr, list):
# Reformat because NumPy needs list of tuples
dtype_descr = [tuple(x) for x in response['dtype']]
self.dtype = dtype_descr
self.shape = tuple(response['shape'] or ())
self.npartitions = response['npartitions']
self.metadata = response['metadata']
self._schema = Schema(datashape=None, dtype=self.dtype,
shape=self.shape,
npartitions=self.npartitions,
metadata=self.metadata)
self._source_id = response['source_id']
def read_a_file(open_file, reader, kwargs):
with open_file as of:
df = reader(of, **kwargs)
df['path'] = open_file.path
return df
if self.dataframe is None:
self.parts = [
dask.delayed(read_a_file)(open_file, self.reader, self.kwargs)
for open_file in self.files
]
self.dataframe = dd.from_delayed(self.parts)
self.npartitions = self.dataframe.npartitions
self.shape = (None, len(self.dataframe.columns))
self.dtype = self.dataframe.dtypes.to_dict()
self._schema = Schema(npartitions=self.npartitions,
extra_metadata=self.metadata,
dtype=self.dtype,
shape=self.shape,
datashape=None)
return self._schema
self._run_stop_doc = self._get_run_stop()
self._run_start_doc = self._get_run_start()
self._descriptors = self._get_event_descriptors()
self._offset = len(self._descriptors) + 1
self.metadata.update({'start': self._run_start_doc})
self.metadata.update({'stop': self._run_stop_doc})
count = 1
descriptor_uids = [doc['uid'] for doc in self._descriptors]
count += len(descriptor_uids)
for doc in self._descriptors:
count += self._get_event_count(doc['uid'])
count += (self._run_stop_doc is not None)
self.npartitions = int(numpy.ceil(count / self.PARTITION_SIZE))
self._schema = intake.source.base.Schema(
datashape=None,
dtype=None,
shape=(count,),
npartitions=self.npartitions,
metadata=self.metadata)
# Make a BlueskyEventStream for each stream_name.
for doc in self._descriptors:
if 'name' not in doc:
warnings.warn(
f"EventDescriptor {doc['uid']!r} has no 'name', likely "
f"because it was generated using an old version of "
f"bluesky. The name 'primary' will be used.")
descriptors_by_name = collections.defaultdict(list)
for doc in self._descriptors:
descriptors_by_name[doc.get('name', 'primary')].append(doc)
def _get_schema(self):
urlpath = self._get_cache(self._urlpath)[0]
if self._dataframe is None:
self._open_dataset(urlpath)
dtypes = self._dataframe._meta.dtypes.to_dict()
dtypes = {n: str(t) for (n, t) in dtypes.items()}
return base.Schema(datashape=None,
dtype=dtypes,
shape=(None, len(dtypes)),
npartitions=self._dataframe.npartitions,
extra_metadata={})
def _get_schema(self):
from fsspec import open_files
if self._files is None:
urlpath = self._get_cache(self._urlpath)[0]
self._files = open_files(
urlpath, mode=self.mode, encoding=self.encoding,
compression=self.compression,
**self._storage_options)
self.npartitions = len(self._files)
return base.Schema(datashape=None,
dtype=None,
shape=(None, ),
npartitions=self.npartitions,
extra_metadata=self.metadata)